From c9fd4927afd7ecc25cc40d164118e89b931f2aeb Mon Sep 17 00:00:00 2001 From: Khushboo Rajput <59671881+khushbr@users.noreply.github.com> Date: Thu, 5 Oct 2023 18:43:12 -0700 Subject: [PATCH] Revert "Searchbackpressure Service Reader and UTs added (#427) (#495)" (#504) This reverts commit aef39818a6877fa925b158c22eeaf9a2c341fe5e. --- .../rca/framework/metrics/ReaderMetrics.java | 8 +- .../reader/MetricsEmitter.java | 130 ------------ .../reader/ReaderMetricsProcessor.java | 23 --- .../SearchBackPressureMetricsProcessor.java | 193 ------------------ .../SearchBackPressureMetricsSnapShot.java | 188 ----------------- ...earchBackPressureMetricsProcessorTest.java | 161 --------------- ...SearchBackPressureMetricsSnapShotTest.java | 136 ------------ 7 files changed, 1 insertion(+), 838 deletions(-) delete mode 100644 src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java delete mode 100644 src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java delete mode 100644 src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java delete mode 100644 src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java index ec6ce0fd9..4c9fc5a04 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java @@ -86,13 +86,7 @@ public enum ReaderMetrics implements MeasurementSet { "FaultDetectionMetricsEmitterExecutionTime", "millis", StatsType.LATENCIES, - Statistics.SUM), - SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME( - "SearchBackPressureMetricsEmitterExecutionTime", - "millis", - StatsType.LATENCIES, - Statistics.SUM), - ; + Statistics.SUM); /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index c376e2bd6..5a225a0ba 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -749,136 +749,6 @@ public static void emitGarbageCollectionInfo( ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT); } - public static void emitSearchBackPressureMetrics( - MetricsDB metricsDB, - SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { - long mCurrT = System.currentTimeMillis(); - Result searchbp_records = searchBackPressureMetricsSnapShot.fetchAll(); - - // String SEARCHBP_MODE_DIM = "searchbp_mode"; - String SEARCHBP_TYPE_DIM = - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(); - String SEARCHBP_TABLE_NAME = - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(); - - List dims = - new ArrayList() { - { - this.add(SEARCHBP_TYPE_DIM); - } - }; - - // stats type in sqlitedb is similar to: - // stats_type_name | sum | avg | min | max - List stats_types = - new ArrayList() { - { - // Shard/Task Stats Cancellation Count - // searchbp_shard_stats_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString()); - // searchbp_task_stats_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString()); - // Shard Stats Resource Heap / CPU Usage - // searchbp_shard_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - // searchbp_shard_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - // searchbp_shard_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - // searchbp_shard_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - // searchbp_shard_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - // searchbp_shard_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - // Task Stats Resource Heap / CPU Usage - // searchbp_task_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - // searchbp_task_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - // searchbp_task_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - // searchbp_task_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - // searchbp_task_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - // searchbp_task_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - } - }; - - metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); - - BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); - - for (Record record : searchbp_records) { - for (String stats_type : stats_types) { - Optional tmpStatsObj = Optional.ofNullable(record.get(stats_type)); - - handle.bind( - stats_type, - // the rest are agg fields: sum, avg, min, max which don't make sense for - // searchbackpressure - tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), - tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), - tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), - tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L)); - } - } - - handle.execute(); - - long mFinalT = System.currentTimeMillis(); - LOG.debug( - "Total time taken for writing Search Back Pressure info into metricsDB: {}", - mFinalT - mCurrT); - ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat( - ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME, - mFinalT - mCurrT); - } - public static void emitAdmissionControlMetrics( MetricsDB metricsDB, AdmissionControlSnapshot snapshot) { long mCurrT = System.currentTimeMillis(); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 3b446d95e..512c52f6d 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -70,7 +70,6 @@ public class ReaderMetricsProcessor implements Runnable { clusterManagerThrottlingMetricsMap; private NavigableMap shardStateMetricsMap; private NavigableMap admissionControlMetricsMap; - private NavigableMap searchBackPressureMetricsMap; private static final int MAX_DATABASES = 2; private static final int OS_SNAPSHOTS = 4; @@ -82,7 +81,6 @@ public class ReaderMetricsProcessor implements Runnable { private static final int GC_INFO_SNAPSHOTS = 4; private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2; private static final int AC_SNAPSHOTS = 2; - private static final int SEARCH_BP_SNAPSHOTS = 4; private final String rootLocation; private final AppContext appContext; @@ -127,8 +125,6 @@ public ReaderMetricsProcessor( gcInfoMap = new TreeMap<>(); clusterManagerThrottlingMetricsMap = new TreeMap<>(); admissionControlMetricsMap = new TreeMap<>(); - searchBackPressureMetricsMap = new TreeMap<>(); - this.rootLocation = rootLocation; this.configOverridesApplier = new ConfigOverridesApplier(); @@ -272,7 +268,6 @@ public void trimOldSnapshots() throws Exception { trimMap(gcInfoMap, GC_INFO_SNAPSHOTS); trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS); trimMap(admissionControlMetricsMap, AC_SNAPSHOTS); - trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS); for (NavigableMap snap : nodeMetricsMap.values()) { // do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot @@ -402,7 +397,6 @@ private void emitMetrics(long currWindowStartTime) throws Exception { emitAdmissionControlMetrics(prevWindowStartTime, metricsDB); emitClusterManagerMetrics(prevWindowStartTime, metricsDB); emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB); - emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB); metricsDB.commit(); metricsDBMap.put(prevWindowStartTime, metricsDB); @@ -600,19 +594,6 @@ private void emitClusterManagerThrottlingMetrics( } } - private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB) - throws Exception { - if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) { - SearchBackPressureMetricsSnapShot prevSearchBPSnapShot = - searchBackPressureMetricsMap.get(prevWindowStartTime); - MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot); - } else { - LOG.debug( - "Search Back Pressure snapshot does not exist for the previous window. " - + "Not emitting metrics."); - } - } - /** * OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second * interval. In the current format, a file (previously a directory) is written every 5 seconds. @@ -698,9 +679,6 @@ is ready so it starts to read that file (go back two windows and EventProcessor admissionControlProcessor = AdmissionControlProcessor.build( currWindowStartTime, conn, admissionControlMetricsMap); - EventProcessor searchBackPressureMetricsProcessor = - SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( - currWindowStartTime, conn, searchBackPressureMetricsMap); // The event dispatcher dispatches events to each of the registered event processors. // In addition to event processing each processor has an initialize/finalize function that @@ -724,7 +702,6 @@ is ready so it starts to read that file (go back two windows and eventDispatcher.registerEventProcessor(faultDetectionProcessor); eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor); eventDispatcher.registerEventProcessor(admissionControlProcessor); - eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor); eventDispatcher.initializeProcessing( currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java deleted file mode 100644 index 6e9547754..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.reader; - - -import java.sql.Connection; -import java.util.ArrayList; -import java.util.Map; -import java.util.NavigableMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jooq.BatchBindStep; -import org.opensearch.performanceanalyzer.commons.event_process.Event; -import org.opensearch.performanceanalyzer.commons.event_process.EventProcessor; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; -import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; -import org.opensearch.performanceanalyzer.commons.util.JsonConverter; - -public class SearchBackPressureMetricsProcessor implements EventProcessor { - - private static final Logger LOG = - LogManager.getLogger(SearchBackPressureMetricsProcessor.class); - - // instance of SearchBackPressureMetricsSnapShot to interact with the backend db - private SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot; - - // entry point for batch queries - private BatchBindStep handle; - - // normally starTime and endTime are gapped by 5 seconds (default sampling interval) - private long startTime; - private long endTime; - - private SearchBackPressureMetricsProcessor( - SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { - this.searchBackPressureMetricsSnapShot = searchBackPressureMetricsSnapShot; - } - - /* - * if current SnapShotMap has the snapshot for currentWindowStartTime, use the snapshot to build the processor - * else create a new Instance of SearchBackPressureMetricsSnapShot to initialize the processor - */ - static SearchBackPressureMetricsProcessor buildSearchBackPressureMetricsProcessor( - long currentWindowStartTime, - Connection connection, - NavigableMap - searchBackPressureSnapshotNavigableMap) { - // if current metrics is in searchBackPressureSnapshotNavigableMap map - if (searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime) == null) { - SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot = - new SearchBackPressureMetricsSnapShot(connection, currentWindowStartTime); - searchBackPressureSnapshotNavigableMap.put( - currentWindowStartTime, searchBackPressureMetricsSnapShot); - return new SearchBackPressureMetricsProcessor(searchBackPressureMetricsSnapShot); - } - return new SearchBackPressureMetricsProcessor( - searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime)); - } - - @Override - public void initializeProcessing(long startTime, long endTime) { - this.startTime = startTime; - this.endTime = endTime; - this.handle = searchBackPressureMetricsSnapShot.startBatchPut(); - } - - @Override - public void finalizeProcessing() { - if (handle.size() > 0) { - handle.execute(); - } - } - - @Override - public boolean shouldProcessEvent(Event event) { - return event.key.contains(PerformanceAnalyzerMetrics.sSearchBackPressureMetricsPath); - } - - @Override - public void commitBatchIfRequired() { - if (handle.size() >= BATCH_LIMIT) { - handle.execute(); - handle = searchBackPressureMetricsSnapShot.startBatchPut(); - } - } - - // Handler method for incoming events - private void handleSearchBackPressureEvent(String eventValue) { - String[] lines = eventValue.split(System.lineSeparator()); - if (lines.length < 2) { - throw new RuntimeException("Missing SearchBackPressure Metrics payload and timestamp."); - } - - // Parse metrics payload - parseJsonLine(lines[1]); - } - - private void parseJsonLine(final String jsonString) { - Map map = JsonConverter.createMapFrom(jsonString); - - if (map.isEmpty()) { - throw new RuntimeException("Missing SearchBackPressure Metrics payload."); - } - - // A list of dims to be collected - ArrayList required_searchbp_dims = - new ArrayList() { - { - // Shard/Task Stats Cancellation Count - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString()); - - // Shard Stats Resource Heap / CPU Usage - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - - // Task Stats Resource Heap / CPU Usage - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - } - }; - - Object[] bindVals = new Object[required_searchbp_dims.size()]; - int idx = 0; - for (String dimension : required_searchbp_dims) { - bindVals[idx++] = map.get(dimension); - } - - handle.bind(bindVals); - } - - @Override - public void processEvent(Event event) { - // Handler method for incoming event - handleSearchBackPressureEvent(event.value); - - // commit Batch queries is overflow the limit - commitBatchIfRequired(); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java deleted file mode 100644 index bcf5c09ad..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.reader; - - -import java.sql.Connection; -import java.util.ArrayList; -import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jooq.BatchBindStep; -import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.Record; -import org.jooq.Result; -import org.jooq.SQLDialect; -import org.jooq.impl.DSL; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; - -/* - * SearchBackPressure cluster/node-level RCA would consume these data in the snapshots and determine whether the search back pressure service - * has cancelled too much/ too less requests, by comparing with predefined threshold. - */ -public class SearchBackPressureMetricsSnapShot implements Removable { - - // Logger for current class - private static final Logger LOG = LogManager.getLogger(SearchBackPressureMetricsSnapShot.class); - - // entry point to interact with SQLite db - private final DSLContext create; - - /* - * This is a tmp table created to populate searchbp stats - * table name is the search_back_pressure_ + windowStartTime - */ - private final String tableName; - - /* columns are the key metrics to be collected (e.g. shar-level search back pressure cancellation count) - */ - private List> columns; - - // Create a table with specifed fields (columns) - public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) { - this.create = DSL.using(conn, SQLDialect.SQLITE); - this.tableName = "search_back_pressure_" + windowStartTime; - - // Add the ControllerName, searchbp_mode columns in the table - this.columns = - new ArrayList>() { - { - // Shard/Task Stats Cancellation Count - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString()), - Integer.class)); - - // Shard Stats Resource Heap / CPU Usage - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()), - Long.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()), - Integer.class)); - - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()), - Long.class)); - - // Task Stats Resource Heap / CPU Usage - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()), - Long.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()), - Integer.class)); - - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()), - Integer.class)); - this.add( - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()), - Long.class)); - } - }; - - // create table with columns specified - create.createTable(tableName).columns(columns).execute(); - } - - public DSLContext getDSLContext() { - return create; - } - - public BatchBindStep startBatchPut() { - // Add dummy values because jooq requires this to support multiple bind statements with - // single insert query - List dummyValues = new ArrayList<>(); - for (int i = 0; i < columns.size(); i++) { - dummyValues.add(null); - } - return create.batch(create.insertInto(DSL.table(this.tableName)).values(dummyValues)); - } - - public Result fetchAll() { - return create.select().from(DSL.table(tableName)).fetch(); - } - - @Override - public void remove() throws Exception { - create.dropTable(DSL.table(tableName)).execute(); - } -} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java deleted file mode 100644 index 575712a2d..000000000 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.reader; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.NavigableMap; -import java.util.TreeMap; -import org.jooq.Record; -import org.jooq.Result; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.opensearch.performanceanalyzer.commons.event_process.Event; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; -import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; - -public class SearchBackPressureMetricsProcessorTest { - private static final String DB_URL = "jdbc:sqlite:"; - // private static final String TEST_MEM_POOL = "testMemPool"; - // private static final String COLLECTOR_NAME = "testCollectorName"; - private static final String SEARCH_BACK_PRESSURE_STATS_KEY = "search_back_pressure_stats"; - private SearchBackPressureMetricsProcessor searchBackPressureMetricsProcessor; - private long currTimeStamp; - - private NavigableMap searchBackPressureStatsMap; - Connection conn; - - // mock SearchBackPressureStatsCollector to test Event processing - private static final String SERIALIZED_EVENT = - "{\"searchbp_shard_stats_cancellationCount\":2," - + "\"searchbp_shard_stats_limitReachedCount\":2," - + "\"searchbp_shard_stats_resource_heap_usage_cancellationCount\":3," - + "\"searchbp_shard_stats_resource_heap_usage_currentMax\":3," - + "\"searchbp_shard_stats_resource_heap_usage_rollingAvg\":3," - + "\"searchbp_shard_stats_resource_cpu_usage_cancellationCount\":5," - + "\"searchbp_shard_stats_resource_cpu_usage_currentMax\":5," - + "\"searchbp_shard_stats_resource_cpu_usage_currentAvg\":5," - + "\"searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount\":2," - + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentMax\":2," - + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentAvg\":2," - + "\"searchbp_task_stats_cancellationCount\":0," - + "\"searchbp_task_stats_limitReachedCount\":0," - + "\"searchbp_task_stats_resource_heap_usage_cancellationCount\":0," - + "\"searchbp_task_stats_resource_heap_usage_currentMax\":0," - + "\"searchbp_task_stats_resource_heap_usage_rollingAvg\":0," - + "\"searchbp_task_stats_resource_cpu_usage_cancellationCount\":0," - + "\"searchbp_task_stats_resource_cpu_usage_currentMax\":0," - + "\"searchbp_task_stats_resource_cpu_usage_currentAvg\":0," - + "\"searchbp_task_stats_resource_elaspedtime_usage_cancellationCount\":0," - + "\"searchbp_task_stats_resource_elaspedtime_usage_currentMax\":0," - + "\"searchbp_task_stats_resource_elaspedtime_usage_currentAvg\":0," - + "\"searchbp_mode\":\"MONITOR_ONLY\"," - + "\"searchbp_nodeid\":\"FgNAAAQQQDSROABCDEFHTX\"}"; - - @Before - public void setup() throws Exception { - Class.forName("org.sqlite.JDBC"); - System.setProperty("java.io.tmpdir", "/tmp"); - conn = DriverManager.getConnection(DB_URL); - this.currTimeStamp = System.currentTimeMillis(); - this.searchBackPressureStatsMap = new TreeMap<>(); - this.searchBackPressureMetricsProcessor = - searchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( - currTimeStamp, conn, searchBackPressureStatsMap); - } - - // Test valid case of the handleSearchBackPressureEvent() - @Test - public void testSearchBackPressureProcessEvent() throws Exception { - // Create a SearchBackPressureEvent - Event testEvent = buildTestSearchBackPressureStatsEvent(); - - // Test the SearchBackPressureMetricsSnapShot - searchBackPressureMetricsProcessor.initializeProcessing( - this.currTimeStamp, System.currentTimeMillis()); - assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); - - searchBackPressureMetricsProcessor.processEvent(testEvent); - searchBackPressureMetricsProcessor.finalizeProcessing(); - - SearchBackPressureMetricsSnapShot currSnapshot = - searchBackPressureStatsMap.get(this.currTimeStamp); - Result result = currSnapshot.fetchAll(); - assertEquals(1, result.size()); - - // SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG value is 3L according to the - // SERIALIZED_EVENT, should EQUAL - Assert.assertEquals( - 3L, - result.get(0) - .get( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString())); - // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the - // SERIALIZED_EVENT, should EQUAL - Assert.assertEquals( - 0, - result.get(0) - .get( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString())); - - // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the - // SERIALIZED_EVENT, should NOT EQUAL - Assert.assertNotEquals( - 2, - result.get(0) - .get( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString())); - } - - @Test - public void testEmptySearchBackPressureProcessEvent() throws Exception { - // Create a SearchBackPressureEvent - Event testEvent = buildEmptyTestSearchBackPressureStatsEvent(); - - // Test the SearchBackPressureMetricsSnapShot - searchBackPressureMetricsProcessor.initializeProcessing( - this.currTimeStamp, System.currentTimeMillis()); - assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); - - try { - searchBackPressureMetricsProcessor.processEvent(testEvent); - Assert.assertFalse( - "Negative scenario test: Should catch a RuntimeException and skip this test", - true); - } catch (RuntimeException ex) { - // should catch the exception and the previous assertion should not be executed - } - } - - private Event buildTestSearchBackPressureStatsEvent() { - StringBuilder str = new StringBuilder(); - str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); - - str.append(SERIALIZED_EVENT).append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); - return new Event( - SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); - } - - private Event buildEmptyTestSearchBackPressureStatsEvent() { - StringBuilder str = new StringBuilder(); - str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); - - return new Event( - SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); - } -} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java deleted file mode 100644 index 2e88aa574..000000000 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.reader; - -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ArrayList; -import org.jooq.BatchBindStep; -import org.jooq.Record; -import org.jooq.Result; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; - -public class SearchBackPressureMetricsSnapShotTest { - private static final String DB_URL = "jdbc:sqlite:"; - private Connection conn; - SearchBackPressureMetricsSnapShot snapshot; - - ArrayList required_searchbp_dims = - new ArrayList() { - { - // Shard/Task Stats Cancellation Count - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString()); - - // Shard Stats Resource Heap / CPU Usage - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - - // Task Stats Resource Heap / CPU Usage - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX - .toString()); - this.add( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG - .toString()); - } - }; - - @Before - public void setup() throws Exception { - Class.forName("org.sqlite.JDBC"); - System.setProperty("java.io.tmpdir", "/tmp"); - conn = DriverManager.getConnection(DB_URL); - snapshot = new SearchBackPressureMetricsSnapShot(conn, System.currentTimeMillis()); - } - - @Test - public void testReadSearchBackPressureMetricsSnapshot() throws Exception { - final BatchBindStep handle = snapshot.startBatchPut(); - insertIntoTable(handle); - - final Result result = snapshot.fetchAll(); - - assertEquals(1, result.size()); - // for 14 (length of required_searchbp_dims) fields, each assign a value from 0 to 13 - // test each field and verify the result - for (int i = 0; i < required_searchbp_dims.size(); i++) { - Assert.assertEquals( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString() - + " should be " - + String.valueOf(i), - i, - ((Number) result.get(0).get(required_searchbp_dims.get(i))).intValue()); - } - } - - @After - public void tearDown() throws Exception { - conn.close(); - } - - private void insertIntoTable(BatchBindStep handle) { - Object[] bindVals = new Object[required_searchbp_dims.size()]; - for (int i = 0; i < required_searchbp_dims.size(); i++) { - bindVals[i] = Integer.valueOf(i); - } - - handle.bind(bindVals).execute(); - } -}