Skip to content

Commit

Permalink
Add data models for health stats api
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Sep 17, 2024
1 parent 290c701 commit 4fd6b0f
Showing 15 changed files with 723 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
@@ -33,6 +34,8 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryInsightsHealthStats;
import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
@@ -439,4 +442,21 @@ protected void doClose() throws IOException {
queryInsightsExporterFactory.closeAllExporters();
queryInsightsReaderFactory.closeAllReaders();
}

/**
* Get health stats for query insights services
*
* @return QueryInsightsHealthStats
*/
public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = new HashMap<>();
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
topQueriesHealthStatsMap.put(entry.getKey(), entry.getValue().getHealthStats());
}
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
);
}
}
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.threadpool.ThreadPool;

@@ -516,4 +517,13 @@ private void drain() {
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}

/**
* Get top queries service health stats
*
* @return TopQueriesHealthStats
*/
public TopQueriesHealthStats getHealthStats() {
return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats());
}
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;

/**
@@ -39,12 +40,12 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
/**
* Metric type for the current grouping service
*/
private MetricType metricType;
private final MetricType metricType;

/**
* Aggregation type for the current grouping service
*/
private AggregationType aggregationType;
private final AggregationType aggregationType;
/**
* Map storing groupingId to Tuple containing Aggregate search query record and boolean.
* SearchQueryRecord: Aggregate search query record to store the aggregate of a metric type based on the aggregation type..
@@ -53,18 +54,18 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
* boolean: True if the aggregate record is in the Top N queries priority query (min heap) and False if the aggregate
* record is in the Max Heap
*/
private ConcurrentHashMap<String, Tuple<SearchQueryRecord, Boolean>> groupIdToAggSearchQueryRecord;
private final ConcurrentHashMap<String, Tuple<SearchQueryRecord, Boolean>> groupIdToAggSearchQueryRecord;
/**
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
*/
private PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
/**
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
* and the records are rearranged accordingly.
*/
private PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;
private final PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;

/**
* Top N size based on the configuration set
@@ -80,11 +81,11 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
private int maxGroups;

public MinMaxHeapQueryGrouper(
MetricType metricType,
GroupingType groupingType,
AggregationType aggregationType,
PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
int topNSize
final MetricType metricType,
final GroupingType groupingType,
final AggregationType aggregationType,
final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
final int topNSize
) {
this.groupingType = groupingType;
this.metricType = metricType;
@@ -103,7 +104,7 @@ public MinMaxHeapQueryGrouper(
* @return return the search query record that represents the group
*/
@Override
public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) {
if (groupingType == GroupingType.NONE) {
throw new IllegalArgumentException("Do not use addQueryToGroup when GroupingType is None");
}
@@ -120,8 +121,7 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
// Add to min PQ and promote to max
// If max PQ is empty return else try to promote record from max to min
if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) {
boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId);
if (maxGroupsLimitReached) {
if (checkMaxGroupsLimitReached(groupId)) {
return null;
}
aggregateSearchQueryRecord = searchQueryRecord;
@@ -158,7 +158,7 @@ public void drain() {
* @return grouping type changed
*/
@Override
public boolean setGroupingType(GroupingType newGroupingType) {
public boolean setGroupingType(final GroupingType newGroupingType) {
if (this.groupingType != newGroupingType) {
this.groupingType = newGroupingType;
drain();
@@ -183,7 +183,7 @@ public GroupingType getGroupingType() {
* @return max groups changed
*/
@Override
public boolean setMaxGroups(int maxGroups) {
public boolean setMaxGroups(final int maxGroups) {
if (this.maxGroups != maxGroups) {
this.maxGroups = maxGroups;
drain();
@@ -197,17 +197,21 @@ public boolean setMaxGroups(int maxGroups) {
* @param newSize new size
*/
@Override
public void updateTopNSize(int newSize) {
public void updateTopNSize(final int newSize) {
this.topNSize = newSize;
}

private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) {
private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) {
minHeapTopQueriesStore.add(searchQueryRecord);
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));
overflow();
}

private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
private void addAndPromote(
final SearchQueryRecord searchQueryRecord,
final SearchQueryRecord aggregateSearchQueryRecord,
final String groupId
) {
Number measurementToAdd = searchQueryRecord.getMeasurement(metricType);
aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd);
addToMinPQ(aggregateSearchQueryRecord, groupId);
@@ -228,7 +232,7 @@ private void overflow() {
}
}

private boolean checkMaxGroupsLimitReached(String groupId) {
private boolean checkMaxGroupsLimitReached(final String groupId) {
if (maxGroups <= maxHeapQueryStore.size() && minHeapTopQueriesStore.size() >= topNSize) {
log.warn(
"Exceeded [{}] setting threshold which is set at {}. Discarding new group with id {}.",
@@ -259,11 +263,11 @@ int numberOfTopGroups() {
}

/**
* Get groupingId. This should be query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
* Get groupingId. This should be the query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
* @param searchQueryRecord record
* @return Grouping Id
*/
private String getGroupingId(SearchQueryRecord searchQueryRecord) {
private String getGroupingId(final SearchQueryRecord searchQueryRecord) {
switch (groupingType) {
case SIMILARITY:
return searchQueryRecord.getAttributes().get(Attribute.QUERY_HASHCODE).toString();
@@ -273,4 +277,13 @@ private String getGroupingId(SearchQueryRecord searchQueryRecord) {
throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType);
}
}

/**
* Get health stats of the MinMaxHeapQueryGrouperService
*
* @return QueryGrouperHealthStats
*/
public QueryGrouperHealthStats getHealthStats() {
return new QueryGrouperHealthStats(this.groupIdToAggSearchQueryRecord.size(), this.maxHeapQueryStore.size());
}
}
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats;

/**
* Interface for grouping search queries based on grouping type for the metric type.
@@ -57,4 +58,11 @@ public interface QueryGrouper {
* @param topNSize the new top N size
*/
void updateTopNSize(int topNSize);

/**
* Get health stats of the QueryGrouperService
*
* @return QueryGrouperHealthStats
*/
QueryGrouperHealthStats getHealthStats();
}
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ public static MetricType fromString(final String metricType) {
* @param metricType the MetricType to write
* @throws IOException IOException
*/
static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException {
public static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException {
out.writeString(metricType.toString());
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.rules.model.healthStats;

import java.io.IOException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

/**
* Represents the health statistics of the query grouper.
*/
public class QueryGrouperHealthStats implements ToXContentFragment, Writeable {
private final int queryGroupCount;
private final int queryGroupHeapSize;
private static final String QUERY_GROUP_COUNT = "QueryGroupCount";
private static final String QUERY_GROUP_HEAP_SIZE = "QueryGroupHeapSize";

/**
* Constructor to read QueryGrouperHealthStats from a StreamInput.
*
* @param in the StreamInput to read the QueryGrouperHealthStats from
* @throws IOException IOException
*/
public QueryGrouperHealthStats(final StreamInput in) throws IOException {
this.queryGroupCount = in.readInt();
this.queryGroupHeapSize = in.readInt();
}

/**
* Constructor of QueryGrouperHealthStats
*
* @param queryGroupCount Number of groups in the grouper
* @param queryGroupHeapSize Heap size of the grouper
*/
public QueryGrouperHealthStats(final int queryGroupCount, final int queryGroupHeapSize) {
this.queryGroupCount = queryGroupCount;
this.queryGroupHeapSize = queryGroupHeapSize;
}

/**
* Write QueryGrouperHealthStats Object to output stream
* @param out streamOutput
* @throws IOException IOException
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(queryGroupCount);
out.writeInt(queryGroupHeapSize);
}

/**
* Write QueryGrouperHealthStats object to XContent
*
* @param builder XContentBuilder
* @param params Parameters
* @return XContentBuilder
* @throws IOException IOException
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(QUERY_GROUP_COUNT, queryGroupCount);
builder.field(QUERY_GROUP_HEAP_SIZE, queryGroupHeapSize);
return builder;
}

/**
* Gets the number of query groups.
*
* @return the query group count
*/
public int getQueryGroupCount() {
return queryGroupCount;
}

/**
* Gets the query group heap size.
*
* @return the query group heap size
*/
public int getQueryGroupHeapSize() {
return queryGroupHeapSize;
}
}
Loading

0 comments on commit 4fd6b0f

Please sign in to comment.