From d2da9bb18f55cb4a9e5dca4c60169160001fc28c Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Fri, 7 Jun 2024 11:37:52 -0700 Subject: [PATCH] Query insights exporters implementation (#12982) (#14061) --------- Signed-off-by: Chenyang Ji (cherry picked from commit 0ddf4bdd9ef14b2d8e928695bef08429579a6a5b) --- CHANGELOG.md | 1 + .../plugin/insights/QueryInsightsPlugin.java | 5 +- .../insights/core/exporter/DebugExporter.java | 61 ++++++++ .../core/exporter/LocalIndexExporter.java | 113 ++++++++++++++ .../core/exporter/QueryInsightsExporter.java | 26 ++++ .../QueryInsightsExporterFactory.java | 143 ++++++++++++++++++ .../insights/core/exporter/SinkType.java | 66 ++++++++ .../insights/core/exporter/package-info.java | 12 ++ .../core/service/QueryInsightsService.java | 37 ++++- .../core/service/TopQueriesService.java | 97 +++++++++++- .../rules/model/SearchQueryRecord.java | 7 + .../settings/QueryInsightsSettings.java | 33 ++++ .../insights/QueryInsightsPluginTests.java | 4 +- .../core/exporter/DebugExporterTests.java | 37 +++++ .../exporter/LocalIndexExporterTests.java | 99 ++++++++++++ .../QueryInsightsExporterFactoryTests.java | 89 +++++++++++ .../service/QueryInsightsServiceTests.java | 18 ++- .../core/service/TopQueriesServiceTests.java | 8 +- 18 files changed, 845 insertions(+), 11 deletions(-) create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f71b3034fcd1..6c41aaa67a2a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819)) - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) +- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 4d7e0d486068a..22831c3e0f8ba 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -70,7 +70,7 @@ public Collection createComponents( final Supplier repositoriesServiceSupplier ) { // create top n queries service - final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client); return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); } @@ -110,7 +110,8 @@ public List> getSettings() { // Settings for top N queries QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS ); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java new file mode 100644 index 0000000000000..116bd26e1f9bc --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.util.List; + +/** + * Debug exporter for development purpose + */ +public final class DebugExporter implements QueryInsightsExporter { + /** + * Logger of the debug exporter + */ + private final Logger logger = LogManager.getLogger(); + + /** + * Constructor of DebugExporter + */ + private DebugExporter() {} + + private static class InstanceHolder { + private static final DebugExporter INSTANCE = new DebugExporter(); + } + + /** + Get the singleton instance of DebugExporter + * + @return DebugExporter instance + */ + public static DebugExporter getInstance() { + return InstanceHolder.INSTANCE; + } + + /** + * Write the list of SearchQueryRecord to debug log + * + * @param records list of {@link SearchQueryRecord} + */ + @Override + public void export(final List records) { + logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString()); + } + + /** + * Close the debugger exporter sink + */ + @Override + public void close() { + logger.debug("Closing the DebugExporter.."); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java new file mode 100644 index 0000000000000..c19fe3655098b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; + +import java.util.List; + +/** + * Local index exporter for exporting query insights data to local OpenSearch indices. + */ +public final class LocalIndexExporter implements QueryInsightsExporter { + /** + * Logger of the local index exporter + */ + private final Logger logger = LogManager.getLogger(); + private final Client client; + private DateTimeFormatter indexPattern; + + /** + * Constructor of LocalIndexExporter + * + * @param client OS client + * @param indexPattern the pattern of index to export to + */ + public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { + this.indexPattern = indexPattern; + this.client = client; + } + + /** + * Getter of indexPattern + * + * @return indexPattern + */ + public DateTimeFormatter getIndexPattern() { + return indexPattern; + } + + /** + * Setter of indexPattern + * + * @param indexPattern index pattern + * @return the current LocalIndexExporter + */ + public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + /** + * Export a list of SearchQueryRecord to a local index + * + * @param records list of {@link SearchQueryRecord} + */ + @Override + public void export(final List records) { + if (records == null || records.size() == 0) { + return; + } + try { + final String index = getDateTimeFromFormat(); + final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); + for (SearchQueryRecord record : records) { + bulkRequestBuilder.add( + new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + bulkRequestBuilder.execute(new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) {} + + @Override + public void onFailure(Exception e) { + logger.error("Failed to execute bulk operation for query insights data: ", e); + } + }); + } catch (final Exception e) { + logger.error("Unable to index query insights data: ", e); + } + } + + /** + * Close the exporter sink + */ + @Override + public void close() { + logger.debug("Closing the LocalIndexExporter.."); + } + + private String getDateTimeFromFormat() { + return indexPattern.print(DateTime.now(DateTimeZone.UTC)); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java new file mode 100644 index 0000000000000..42e5354eb1640 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.Closeable; +import java.util.List; + +/** + * Base interface for Query Insights exporters + */ +public interface QueryInsightsExporter extends Closeable { + /** + * Export a list of SearchQueryRecord to the exporter sink + * + * @param records list of {@link SearchQueryRecord} + */ + void export(final List records); +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java new file mode 100644 index 0000000000000..7324590c9f582 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.joda.time.format.DateTimeFormat; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; + +/** + * Factory class for validating and creating exporters based on provided settings + */ +public class QueryInsightsExporterFactory { + /** + * Logger of the query insights exporter factory + */ + private final Logger logger = LogManager.getLogger(); + final private Client client; + final private Set exporters; + + /** + * Constructor of QueryInsightsExporterFactory + * + * @param client OS client + */ + public QueryInsightsExporterFactory(final Client client) { + this.client = client; + this.exporters = new HashSet<>(); + } + + /** + * Validate exporter sink config + * + * @param settings exporter sink config {@link Settings} + * @throws IllegalArgumentException if provided exporter sink config settings are invalid + */ + public void validateExporterConfig(final Settings settings) throws IllegalArgumentException { + // Disable exporter if the EXPORTER_TYPE setting is null + if (settings.get(EXPORTER_TYPE) == null) { + return; + } + SinkType type; + try { + type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Invalid exporter type [%s], type should be one of %s", + settings.get(EXPORTER_TYPE), + SinkType.allSinkTypes() + ) + ); + } + switch (type) { + case LOCAL_INDEX: + final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN); + if (indexPattern.length() == 0) { + throw new IllegalArgumentException("Empty index pattern configured for the exporter"); + } + try { + DateTimeFormat.forPattern(indexPattern); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern) + ); + } + } + } + + /** + * Create an exporter based on provided parameters + * + * @param type The type of exporter to create + * @param indexPattern the index pattern if creating a index exporter + * @return QueryInsightsExporter the created exporter sink + */ + public QueryInsightsExporter createExporter(SinkType type, String indexPattern) { + if (SinkType.LOCAL_INDEX.equals(type)) { + QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern)); + this.exporters.add(exporter); + return exporter; + } + return DebugExporter.getInstance(); + } + + /** + * Update an exporter based on provided parameters + * + * @param exporter The exporter to update + * @param indexPattern the index pattern if creating a index exporter + * @return QueryInsightsExporter the updated exporter sink + */ + public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) { + if (exporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern)); + } + return exporter; + } + + /** + * Close an exporter + * + * @param exporter the exporter to close + */ + public void closeExporter(QueryInsightsExporter exporter) throws IOException { + if (exporter != null) { + exporter.close(); + this.exporters.remove(exporter); + } + } + + /** + * Close all exporters + * + */ + public void closeAllExporters() { + for (QueryInsightsExporter exporter : exporters) { + try { + closeExporter(exporter); + } catch (IOException e) { + logger.error("Fail to close query insights exporter, error: ", e); + } + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java new file mode 100644 index 0000000000000..c90c9c76b6706 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Type of supported sinks + */ +public enum SinkType { + /** debug exporter */ + DEBUG("debug"), + /** local index exporter */ + LOCAL_INDEX("local_index"); + + private final String type; + + SinkType(String type) { + this.type = type; + } + + @Override + public String toString() { + return type; + } + + /** + * Parse SinkType from String + * @param type the String representation of the SinkType + * @return SinkType + */ + public static SinkType parse(final String type) { + return valueOf(type.toUpperCase(Locale.ROOT)); + } + + /** + * Get all valid SinkTypes + * + * @return A set contains all valid SinkTypes + */ + public static Set allSinkTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Get Sink type from exporter + * + * @param exporter the {@link QueryInsightsExporter} + * @return SinkType associated with this exporter + */ + public static SinkType getSinkTypeFromExporter(QueryInsightsExporter exporter) { + if (exporter.getClass().equals(LocalIndexExporter.class)) { + return SinkType.LOCAL_INDEX; + } + return SinkType.DEBUG; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java new file mode 100644 index 0000000000000..7164411194f85 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Insights exporter + */ +package org.opensearch.plugin.insights.core.exporter; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 525ca0d4a3d33..a83bb2094f165 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,14 +8,18 @@ package org.opensearch.plugin.insights.core.service; +import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -23,6 +27,8 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS; + /** * Service responsible for gathering, analyzing, storing and exporting * information related to search queries @@ -56,21 +62,35 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ protected volatile Scheduler.Cancellable scheduledFuture; + /** + * Query Insights exporter factory + */ + final QueryInsightsExporterFactory queryInsightsExporterFactory; + /** * Constructor of the QueryInsightsService * - * @param threadPool The OpenSearch thread pool to run async tasks + * @param clusterSettings OpenSearch cluster level settings + * @param threadPool The OpenSearch thread pool to run async tasks + * @param client OS client */ @Inject - public QueryInsightsService(final ThreadPool threadPool) { + public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) { enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + this.threadPool = threadPool; + this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + // initialize top n queries services and configurations consumers topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { enableCollect.put(metricType, false); - topQueriesServices.put(metricType, new TopQueriesService(metricType)); + topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory)); } - this.threadPool = threadPool; + clusterSettings.addSettingsUpdateConsumer( + TOP_N_LATENCY_EXPORTER_SETTINGS, + (settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)), + (settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings)) + ); } /** @@ -176,5 +196,12 @@ protected void doStop() { } @Override - protected void doClose() {} + protected void doClose() throws IOException { + // close all top n queries service + for (TopQueriesService topQueriesService : topQueriesServices.values()) { + topQueriesService.close(); + } + // close any unclosed resources + queryInsightsExporterFactory.closeAllExporters(); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index d2c30cbdf98e7..ff90edf1ec33d 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -8,11 +8,19 @@ package org.opensearch.plugin.insights.core.service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -27,6 +35,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; + /** * Service responsible for gathering and storing top N queries * with high latency or resource usage @@ -34,6 +48,10 @@ * @opensearch.internal */ public class TopQueriesService { + /** + * Logger of the local index exporter + */ + private final Logger logger = LogManager.getLogger(); private boolean enabled; /** * The metric type to measure top n queries @@ -63,12 +81,34 @@ public class TopQueriesService { */ private final AtomicReference> topQueriesHistorySnapshot; - TopQueriesService(final MetricType metricType) { + /** + * Factory for validating and creating exporters + */ + private final QueryInsightsExporterFactory queryInsightsExporterFactory; + + /** + * The internal OpenSearch thread pool that execute async processing and exporting tasks + */ + private final ThreadPool threadPool; + + /** + * Exporter for exporting top queries data + */ + private QueryInsightsExporter exporter; + + TopQueriesService( + final MetricType metricType, + final ThreadPool threadPool, + final QueryInsightsExporterFactory queryInsightsExporterFactory + ) { this.enabled = false; this.metricType = metricType; + this.threadPool = threadPool; + this.queryInsightsExporterFactory = queryInsightsExporterFactory; this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; + this.exporter = null; topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); @@ -169,6 +209,50 @@ public void validateWindowSize(final TimeValue windowSize) { } } + /** + * Set up the top queries exporter based on provided settings + * + * @param settings exporter config {@link Settings} + */ + public void setExporter(final Settings settings) { + if (settings.get(EXPORTER_TYPE) != null) { + SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) { + queryInsightsExporterFactory.updateExporter( + exporter, + settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN) + ); + } else { + try { + queryInsightsExporterFactory.closeExporter(this.exporter); + } catch (IOException e) { + logger.error("Fail to close the current exporter when updating exporter, error: ", e); + } + this.exporter = queryInsightsExporterFactory.createExporter( + SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)), + settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN) + ); + } + } else { + // Disable exporter if exporter type is set to null + try { + queryInsightsExporterFactory.closeExporter(this.exporter); + this.exporter = null; + } catch (IOException e) { + logger.error("Fail to close the current exporter when disabling exporter, error: ", e); + } + } + } + + /** + * Validate provided settings for top queries exporter + * + * @param settings settings exporter config {@link Settings} + */ + public void validateExporterConfig(Settings settings) { + queryInsightsExporterFactory.validateExporterConfig(settings); + } + /** * Get all top queries records that are in the current top n queries store * Optionally include top N records from the last window. @@ -254,6 +338,10 @@ private void rotateWindowIfNecessary(final long newWindowStart) { topQueriesStore.clear(); topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; + // export to the configured sink + if (exporter != null) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history)); + } } } @@ -279,4 +367,11 @@ private long calculateWindowStart(final long timestamp) { public List getTopQueriesCurrentSnapshot() { return topQueriesCurrentSnapshot.get(); } + + /** + * Close the top n queries service + */ + public void close() throws IOException { + queryInsightsExporterFactory.closeExporter(this.exporter); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 060711edb5580..fec00a680ae58 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -8,9 +8,11 @@ package org.opensearch.plugin.insights.rules.model; +import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -173,4 +175,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(timestamp, measurements, attributes); } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 52cc1fbde790f..b2e01062e334c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -9,7 +9,9 @@ package org.opensearch.plugin.insights.settings; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.SinkType; import java.util.Arrays; import java.util.HashSet; @@ -109,6 +111,37 @@ public class QueryInsightsSettings { Setting.Property.Dynamic ); + /** + * Config key for exporter type + */ + public static final String EXPORTER_TYPE = "type"; + /** + * Config key for export index + */ + public static final String EXPORT_INDEX = "config.index"; + + /** + * Settings and defaults for top queries exporters + */ + private static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter."; + /** + * Default index pattern of top n queries by latency + */ + public static final String DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN = "'top_queries_by_latency-'YYYY.MM.dd"; + /** + * Default exporter type of top queries + */ + public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + + /** + * Settings for the exporter of top latency queries + */ + public static final Setting TOP_N_LATENCY_EXPORTER_SETTINGS = Setting.groupSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Default constructor */ diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 273b69e483e8c..15bf284652e13 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -49,6 +49,7 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); clusterService = new ClusterService(settings, clusterSettings, threadPool); @@ -59,7 +60,8 @@ public void testGetSettings() { Arrays.asList( QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS ), queryInsightsPlugin.getSettings() ); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java new file mode 100644 index 0000000000000..736e406289b2c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; + +/** + * Granular tests for the {@link DebugExporterTests} class. + */ +public class DebugExporterTests extends OpenSearchTestCase { + private DebugExporter debugExporter; + + @Before + public void setup() { + debugExporter = DebugExporter.getInstance(); + } + + public void testExport() { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + debugExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java new file mode 100644 index 0000000000000..9ea864a7083f4 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.action.bulk.BulkAction; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.OpenSearchTestCase; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.Before; + +import java.util.List; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Granular tests for the {@link LocalIndexExporterTests} class. + */ +public class LocalIndexExporterTests extends OpenSearchTestCase { + private final DateTimeFormatter format = DateTimeFormat.forPattern("YYYY.MM.dd"); + private final Client client = mock(Client.class); + private LocalIndexExporter localIndexExporter; + + @Before + public void setup() { + localIndexExporter = new LocalIndexExporter(client, format); + } + + public void testExportEmptyRecords() { + List records = List.of(); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting empty query insights data"); + } + } + + @SuppressWarnings("unchecked") + public void testExportRecords() { + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); + final PlainActionFuture future = mock(PlainActionFuture.class); + when(future.actionGet()).thenReturn(null); + doAnswer(invocation -> future).when(bulkRequestBuilder).execute(); + when(client.prepareBulk()).thenReturn(bulkRequestBuilder); + + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + assertEquals(2, bulkRequestBuilder.numberOfActions()); + } + + @SuppressWarnings("unchecked") + public void testExportRecordsWithError() { + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); + final PlainActionFuture future = mock(PlainActionFuture.class); + when(future.actionGet()).thenReturn(null); + doThrow(new RuntimeException()).when(bulkRequestBuilder).execute(); + when(client.prepareBulk()).thenReturn(bulkRequestBuilder); + + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + } + + public void testClose() { + try { + localIndexExporter.close(); + } catch (Exception e) { + fail("No exception should be thrown when closing local index exporter"); + } + } + + public void testGetAndSetIndexPattern() { + DateTimeFormatter newFormatter = mock(DateTimeFormatter.class); + localIndexExporter.setIndexPattern(newFormatter); + assert (localIndexExporter.getIndexPattern() == newFormatter); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java new file mode 100644 index 0000000000000..f01dd2c17509c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.joda.time.format.DateTimeFormat; +import org.junit.Before; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; +import static org.mockito.Mockito.mock; + +/** + * Granular tests for the {@link QueryInsightsExporterFactoryTests} class. + */ +public class QueryInsightsExporterFactoryTests extends OpenSearchTestCase { + private final String format = "YYYY.MM.dd"; + + private final Client client = mock(Client.class); + private QueryInsightsExporterFactory queryInsightsExporterFactory; + + @Before + public void setup() { + queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + } + + public void testValidateConfigWhenResetExporter() { + Settings.Builder settingsBuilder = Settings.builder(); + // empty settings + Settings settings = settingsBuilder.build(); + try { + queryInsightsExporterFactory.validateExporterConfig(settings); + } catch (Exception e) { + fail("No exception should be thrown when setting is null"); + } + } + + public void testInvalidExporterTypeConfig() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.put(EXPORTER_TYPE, "some_invalid_type").build(); + assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterConfig(settings); }); + } + + public void testInvalidLocalIndexConfig() { + Settings.Builder settingsBuilder = Settings.builder(); + assertThrows(IllegalArgumentException.class, () -> { + queryInsightsExporterFactory.validateExporterConfig( + settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "").build() + ); + }); + assertThrows(IllegalArgumentException.class, () -> { + queryInsightsExporterFactory.validateExporterConfig( + settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "some_invalid_pattern").build() + ); + }); + } + + public void testCreateAndCloseExporter() { + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter(SinkType.LOCAL_INDEX, format); + assertTrue(exporter1 instanceof LocalIndexExporter); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + assertTrue(exporter2 instanceof DebugExporter); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + assertTrue(exporter3 instanceof DebugExporter); + try { + queryInsightsExporterFactory.closeExporter(exporter1); + queryInsightsExporterFactory.closeExporter(exporter2); + queryInsightsExporterFactory.closeAllExporters(); + } catch (Exception e) { + fail("No exception should be thrown when closing exporter"); + } + } + + public void testUpdateExporter() { + LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern("yyyy-MM-dd")); + queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH"); + assertEquals(DateTimeFormat.forPattern("yyyy-MM-dd-HH"), exporter.getIndexPattern()); + } + +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index c29b48b9690d1..428f615ce2f90 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -8,6 +8,9 @@ package org.opensearch.plugin.insights.core.service; +import org.opensearch.client.Client; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -23,11 +26,16 @@ */ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final ThreadPool threadPool = mock(ThreadPool.class); + private final Client client = mock(Client.class); private QueryInsightsService queryInsightsService; @Before public void setup() { - queryInsightsService = new QueryInsightsService(threadPool); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); + queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client); queryInsightsService.enableCollection(MetricType.LATENCY, true); queryInsightsService.enableCollection(MetricType.CPU, true); queryInsightsService.enableCollection(MetricType.JVM, true); @@ -46,4 +54,12 @@ public void testAddRecordToLimitAndDrain() { queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() ); } + + public void testClose() { + try { + queryInsightsService.doClose(); + } catch (Exception e) { + fail("No exception expected when closing query insights service"); + } + } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 060df84a89485..3efd4c86833cc 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -11,24 +11,30 @@ import org.opensearch.cluster.coordination.DeterministicTaskQueue; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; + /** * Unit Tests for {@link QueryInsightsService}. */ public class TopQueriesServiceTests extends OpenSearchTestCase { private TopQueriesService topQueriesService; + private final ThreadPool threadPool = mock(ThreadPool.class); + private final QueryInsightsExporterFactory queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class); @Before public void setup() { - topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory); topQueriesService.setTopNSize(Integer.MAX_VALUE); topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); topQueriesService.setEnabled(true);