Skip to content

Commit

Permalink
[Backport 2.17] Optimize NodeIndicesStats output behind flag (opensea…
Browse files Browse the repository at this point in the history
…rch-project#15678)

* Optimize NodeIndicesStats output behind flag (opensearch-project#14454)

Signed-off-by: Pranshu Shukla <[email protected]>
(cherry picked from commit e146f13)
  • Loading branch information
Pranshu-S authored Sep 5, 2024
1 parent cea8eea commit bf946d9
Show file tree
Hide file tree
Showing 8 changed files with 904 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean includeIndicesStatsByLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -106,6 +107,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -135,6 +139,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(includeIndicesStatsByLevel);
}
}

/**
Expand Down Expand Up @@ -273,6 +280,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
}

public boolean getIncludeIndicesStatsByLevel() {
return this.includeIndicesStatsByLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,12 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}

return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
}

Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down
199 changes: 171 additions & 28 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,9 +64,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Global information on indices stats running on a specific node.
Expand All @@ -74,26 +77,27 @@
*/
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, List<IndexShardStats>> statsByShard;
protected CommonStats stats;
protected Map<Index, CommonStats> statsByIndex;
protected Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = readStatsByIndex(in);
}
}
if (in.readBoolean()) {
statsByShard = readStatsByShard(in);
}
}

/**
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
*/
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
// this.stats = stats;
this.statsByShard = statsByShard;
Expand All @@ -112,6 +116,90 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
}
}

/**
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
* stats and return only the information that is required while returning to the client.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}
}

/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
* selected based on enum defined in {@link StatsLevel}
*
* Note - we are picking the first level as multiple levels are not supported in the previous versions.
* @param levels - levels sent in the request.
*
* @return Corresponding identified enum {@link StatsLevel}
*/
public static StatsLevel getAcceptedLevel(String[] levels) {
if (levels != null && levels.length > 0) {
Optional<StatsLevel> level = Arrays.stream(StatsLevel.values())
.filter(field -> field.getRestName().equals(levels[0]))
.findFirst();
return level.orElseThrow(() -> new IllegalArgumentException("Level provided is not supported by NodeIndicesStats"));
}
return null;
}

private Map<Index, CommonStats> readStatsByIndex(StreamInput in) throws IOException {
Map<Index, CommonStats> statsByIndex = new HashMap<>();
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
return statsByIndex;
}

private Map<Index, List<IndexShardStats>> readStatsByShard(StreamInput in) throws IOException {
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
return statsByShard;
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +283,31 @@ public RecoveryStats getRecoveryStats() {
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -210,29 +322,46 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final String level = params.param("level", "node");
final boolean isLevelValid = "indices".equalsIgnoreCase(level)
|| "node".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
final String level = params.param("level", StatsLevel.NODE.getRestName());
final boolean isLevelValid = StatsLevel.NODE.getRestName().equalsIgnoreCase(level)
|| StatsLevel.INDICES.getRestName().equalsIgnoreCase(level)
|| StatsLevel.SHARDS.getRestName().equalsIgnoreCase(level);
if (!isLevelValid) {
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
throw new IllegalArgumentException(
"level parameter must be one of ["
+ StatsLevel.INDICES.getRestName()
+ "] or ["
+ StatsLevel.NODE.getRestName()
+ "] or ["
+ StatsLevel.SHARDS.getRestName()
+ "] but was ["
+ level
+ "]"
);
}

// "node" level
builder.startObject(Fields.INDICES);
builder.startObject(StatsLevel.INDICES.getRestName());
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
if (StatsLevel.INDICES.getRestName().equals(level)) {
assert statsByIndex != null || statsByShard != null : "Expected shard stats or index stats in response for generating ["
+ StatsLevel.INDICES
+ "] field";
if (statsByIndex == null) {
statsByIndex = createStatsByIndex(statsByShard);
}

builder.startObject(StatsLevel.INDICES.getRestName());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
builder.startObject("shards");
} else if (StatsLevel.SHARDS.getRestName().equals(level)) {
builder.startObject(StatsLevel.SHARDS.getRestName());
assert statsByShard != null : "Expected shard stats in response for generating [" + StatsLevel.SHARDS + "] field";
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
Expand All @@ -251,7 +380,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand Down Expand Up @@ -281,7 +410,21 @@ public List<IndexShardStats> getShardStats(Index index) {
*
* @opensearch.internal
*/
static final class Fields {
static final String INDICES = "indices";
@PublicApi(since = "2.17.0")
public enum StatsLevel {
INDICES("indices"),
SHARDS("shards"),
NODE("node");

private final String restName;

StatsLevel(String restName) {
this.restName = restName;
}

public String getRestName() {
return restName;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
Expand Down
Loading

0 comments on commit bf946d9

Please sign in to comment.