Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query group stats related structures #15338

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a1508ff
initial code for the sandbox resource tracking and cancellation frame…
kiranprakash154 May 30, 2024
3bf611e
Fix Failing Tests
kiranprakash154 May 30, 2024
71f6088
spotless Apply
kiranprakash154 May 30, 2024
71a61ec
Update SandboxService.java
kiranprakash154 May 30, 2024
60fb018
Update SandboxService.java
kiranprakash154 May 30, 2024
b39aa89
Update SandboxTask.java
kiranprakash154 May 30, 2024
827b3cf
Add java docs
kiranprakash154 May 30, 2024
e7a04aa
spotless
kiranprakash154 May 30, 2024
6677050
javadocs
kiranprakash154 May 30, 2024
ee15703
javadocs
kiranprakash154 May 30, 2024
15d6236
java docs
kiranprakash154 May 30, 2024
0690411
Update AbstractTaskCancellation.java
kiranprakash154 May 30, 2024
4251851
Update SandboxModule.java
kiranprakash154 May 30, 2024
a38dda9
Some tests and stubs
kiranprakash154 Jun 4, 2024
a5cf2c9
spotless
kiranprakash154 Jun 4, 2024
a76e963
:server:testingConventions
kiranprakash154 Jun 4, 2024
3eabc99
Update AbstractTaskCancellation.java
kiranprakash154 Jun 5, 2024
1565b11
more tests
kiranprakash154 Jun 5, 2024
d2e3f0a
addressing comments
kiranprakash154 Jul 3, 2024
3b31926
revert some accidentally pushed files
kiranprakash154 Jul 3, 2024
e250d7d
resolve flakiness
kiranprakash154 Jul 15, 2024
0ae7c85
renaming sandbox to querygroup and adjusting code based on merged PRs
kiranprakash154 Jul 18, 2024
c4906ad
jvm to memory
kiranprakash154 Jul 19, 2024
3e50f7a
missing java docs
kiranprakash154 Jul 19, 2024
9b2f270
spotless
kiranprakash154 Jul 19, 2024
d9e0858
Update CHANGELOG.md
kiranprakash154 Jul 19, 2024
1a652b2
pluck cancellation changes out of this PR
kiranprakash154 Jul 19, 2024
403605c
remove unused
kiranprakash154 Jul 19, 2024
6d0ee4f
remove cancellation related code and add more tests coverage
kiranprakash154 Jul 22, 2024
39c33b1
us only memory and not jvm
kiranprakash154 Jul 22, 2024
bc5e0aa
test conventions
kiranprakash154 Jul 22, 2024
b5d6fc8
Bring back enum
kiranprakash154 Jul 29, 2024
e2922f4
Update SearchBackpressureService.java
kiranprakash154 Jul 29, 2024
029593c
revert changes
kiranprakash154 Jul 29, 2024
b8ae0ba
revert changes
kiranprakash154 Jul 29, 2024
4b00edf
all required changes
kiranprakash154 Jul 29, 2024
28c160f
Update CHANGELOG.md
kiranprakash154 Jul 29, 2024
42787a5
cleanups
kiranprakash154 Jul 29, 2024
ceca794
Delete QueryGroupService.java
kiranprakash154 Jul 29, 2024
c18722d
cleanups
kiranprakash154 Jul 29, 2024
b95d97b
Update QueryGroupLevelResourceUsageViewTests.java
kiranprakash154 Jul 29, 2024
2e0597d
Update QueryGroupLevelResourceUsageViewTests.java
kiranprakash154 Jul 29, 2024
0c3351f
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 29, 2024
d7d2f98
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 30, 2024
ab87fb5
Update QueryGroupResourceUsageTrackerService.java
kiranprakash154 Jul 31, 2024
56a8b1c
Update CHANGELOG.md
kiranprakash154 Jul 31, 2024
a9c52c9
rebasing with latest main
kiranprakash154 Jul 31, 2024
f957a3c
remove experimental
kiranprakash154 Jul 31, 2024
f77206b
add queryGroup state to capture live view of QueryGroup stats
kaushalmahi12 Aug 1, 2024
daf0b70
add query group stats pojo
kaushalmahi12 Aug 12, 2024
87f1717
Merge branch 'main' into feature/sandbox-stats1
kaushalmahi12 Aug 12, 2024
6ef19f3
add missing javadoc
kaushalmahi12 Aug 14, 2024
c4262da
add query group state tests
kaushalmahi12 Aug 21, 2024
38f028f
bring stats changes into main
kaushalmahi12 Aug 21, 2024
522bc4d
remove unnecessary files
kaushalmahi12 Aug 21, 2024
c997bf6
apply spotless check
kaushalmahi12 Aug 21, 2024
b751f48
apply changelog entry
kaushalmahi12 Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- [Workload Management] Add query group stats constructs ([#15338](https://github.com/opensearch-project/OpenSearch/pull/15338)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
125 changes: 125 additions & 0 deletions server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.stats;

import org.opensearch.search.ResourceType;

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class will keep the point in time view of the query group stats
*/
public class QueryGroupState {
/**
* completions at the query group level, this is a cumulative counter since the Opensearch start time
*/
private final AtomicLong completions = new AtomicLong();

/**
* rejections at the query group level, this is a cumulative counter since the Opensearch start time
*/
private final AtomicLong rejections = new AtomicLong();

/**
* this will track the cumulative failures in a query group
*/
private final AtomicLong failures = new AtomicLong();

/**
* This is used to store the resource type state both for CPU and MEMORY
*/
private final Map<ResourceType, ResourceTypeState> resourceState;

public QueryGroupState() {
resourceState = new EnumMap<>(ResourceType.class);
resourceState.put(ResourceType.CPU, new ResourceTypeState(ResourceType.CPU));
resourceState.put(ResourceType.MEMORY, new ResourceTypeState(ResourceType.MEMORY));
}

/**
*
* @return completions in the query group
*/
public long getCompletions() {
return completions.get();
}

/**
*
* @return rejections in the query group
*/
public long getRejections() {
return rejections.get();
}

/**
*
* @return failures in the query group
*/
public long getFailures() {
return failures.get();
}

/**
* getter for query group resource state
* @return the query group resource state
*/
public Map<ResourceType, ResourceTypeState> getResourceState() {
return resourceState;
}

/**
* this is a call back to increment cancellations for a query group at task level
*/
public void incrementCompletions() {
completions.incrementAndGet();
}

/**
* this is a call back to increment rejections for a query group at incoming request
*/
public void incrementRejections() {
rejections.incrementAndGet();
}

/**
* this is a call back to increment failures for a query group
*/
public void incrementFailures() {
failures.incrementAndGet();
}

/**
* This class holds the resource level stats for the query group
*/
public static class ResourceTypeState {
private final ResourceType resourceType;
private final AtomicLong cancellations = new AtomicLong();

public ResourceTypeState(ResourceType resourceType) {
this.resourceType = resourceType;
}

/**
* getter for resource type cancellations
*/
public long getCancellations() {
return cancellations.get();
}

/**
* this will be called when a task is cancelled due to this resource
*/
public void incrementCancellations() {
cancellations.incrementAndGet();
}
}
}
197 changes: 197 additions & 0 deletions server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.stats;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.ResourceType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
* {
* "queryGroupID": {
* "completions": 1233234234,
* "rejections": 1243545,
* "failures": 97,
* "CPU": { "current_usage": 49.6, "cancellation": 432 },
* "MEMORY": { "current_usage": 39.6, "cancellation": 42 }
* },
* ...
* ...
* }
*/
public class QueryGroupStats implements ToXContentObject, Writeable {
private final Map<String, QueryGroupStatsHolder> stats;

public QueryGroupStats(Map<String, QueryGroupStatsHolder> stats) {
this.stats = stats;
}

public QueryGroupStats(StreamInput in) throws IOException {
stats = in.readMap(StreamInput::readString, QueryGroupStatsHolder::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(stats, StreamOutput::writeString, QueryGroupStatsHolder::writeTo);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : stats.entrySet()) {
builder.startObject(queryGroupStats.getKey());
queryGroupStats.getValue().toXContent(builder, params);
builder.endObject();
}
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueryGroupStats that = (QueryGroupStats) o;
return Objects.equals(stats, that.stats);
}

@Override
public int hashCode() {
return Objects.hash(stats);
}

/**
* This is a stats holder object which will hold the data for a query group at a point in time
* the instance will only be created on demand through stats api
*/
public static class QueryGroupStatsHolder implements ToXContentObject, Writeable {
public static final String COMPLETIONS = "completions";
public static final String REJECTIONS = "rejections";
private final long completions;
private final long rejections;
private final Map<ResourceType, ResourceStats> resourceStats;

public QueryGroupStatsHolder(long completions, long rejections, Map<ResourceType, ResourceStats> resourceStats) {
this.completions = completions;
this.rejections = rejections;
this.resourceStats = resourceStats;
}

public QueryGroupStatsHolder(StreamInput in) throws IOException {
this.completions = in.readVLong();
this.rejections = in.readVLong();
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
}

/**
* Writes the {@param statsHolder} to {@param out}
* @param out StreamOutput
* @param statsHolder QueryGroupStatsHolder
* @throws IOException exception
*/
public static void writeTo(StreamOutput out, QueryGroupStatsHolder statsHolder) throws IOException {
out.writeVLong(statsHolder.completions);
out.writeVLong(statsHolder.rejections);
out.writeMap(statsHolder.resourceStats, (o, val) -> o.writeString(val.getName()), ResourceStats::writeTo);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
QueryGroupStatsHolder.writeTo(out, this);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(COMPLETIONS, completions);
builder.field(REJECTIONS, rejections);
for (Map.Entry<ResourceType, ResourceStats> resourceStat : resourceStats.entrySet()) {
ResourceType resourceType = resourceStat.getKey();
ResourceStats resourceStats1 = resourceStat.getValue();
builder.startObject(resourceType.getName());
resourceStats1.toXContent(builder, params);
builder.endObject();
}
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueryGroupStatsHolder that = (QueryGroupStatsHolder) o;
return completions == that.completions && rejections == that.rejections && Objects.equals(resourceStats, that.resourceStats);
}

@Override
public int hashCode() {
return Objects.hash(completions, rejections, resourceStats);
}
}

/**
* point in time resource level stats holder
*/
public static class ResourceStats implements ToXContentObject, Writeable {
public static final String CURRENT_USAGE = "current_usage";
public static final String CANCELLATIONS = "cancellations";
private final double currentUsage;
private final long cancellations;

public ResourceStats(double currentUsage, long cancellations) {
this.currentUsage = currentUsage;
this.cancellations = cancellations;
}

public ResourceStats(StreamInput in) throws IOException {
this.currentUsage = in.readDouble();
this.cancellations = in.readVLong();
}

/**
* Writes the {@param stats} to {@param out}
* @param out StreamOutput
* @param stats QueryGroupStatsHolder
* @throws IOException exception
*/
public static void writeTo(StreamOutput out, ResourceStats stats) throws IOException {
out.writeDouble(stats.currentUsage);
out.writeVLong(stats.cancellations);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
ResourceStats.writeTo(out, this);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(CURRENT_USAGE, currentUsage);
builder.field(CANCELLATIONS, cancellations);
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResourceStats that = (ResourceStats) o;
return (currentUsage - that.currentUsage) < 1e-9 && cancellations == that.cancellations;
}

@Override
public int hashCode() {
return Objects.hash(currentUsage, cancellations);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.stats;

import org.opensearch.search.ResourceType;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.List;

public class QueryGroupStateTests extends OpenSearchTestCase {
QueryGroupState queryGroupState;

public void testRandomQueryGroupsStateUpdates() {
queryGroupState = new QueryGroupState();
List<Thread> updaterThreads = new ArrayList<>();

for (int i = 0; i < 25; i++) {
if (i % 5 == 0) {
updaterThreads.add(new Thread(() -> queryGroupState.incrementCompletions()));
} else if (i % 5 == 1) {
updaterThreads.add(new Thread(() -> queryGroupState.incrementRejections()));
} else if (i % 5 == 2) {
updaterThreads.add(new Thread(() -> queryGroupState.incrementFailures()));
} else if (i % 5 == 3) {
updaterThreads.add(new Thread(() -> queryGroupState.getResourceState().get(ResourceType.CPU).incrementCancellations()));
} else {
updaterThreads.add(new Thread(() -> queryGroupState.getResourceState().get(ResourceType.MEMORY).incrementCancellations()));
}
}

// trigger the updates
updaterThreads.forEach(Thread::start);
// wait for updates to be finished
updaterThreads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException ignored) {

}
});

assertEquals(5, queryGroupState.getCompletions());
assertEquals(5, queryGroupState.getRejections());
assertEquals(5, queryGroupState.getFailures());
assertEquals(5, queryGroupState.getResourceState().get(ResourceType.CPU).getCancellations());
assertEquals(5, queryGroupState.getResourceState().get(ResourceType.MEMORY).getCancellations());
}
}
Loading
Loading