Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rollover and delete history custom result index conditions #16

Open
wants to merge 3 commits into
base: forecasting18_3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.timeseries.indices;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.forecast.constant.ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
Expand All @@ -66,6 +68,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
Expand Down Expand Up @@ -298,7 +302,8 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
long latest = Long.MIN_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
Expand Down Expand Up @@ -1100,6 +1105,7 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
Expand Down Expand Up @@ -1234,35 +1240,94 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
if (!doesDefaultResultIndexExist()) {
return;
// perform rollover and delete on default result index
if (doesDefaultResultIndexExist()) {
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);
defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex);
}
// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
if (candidateResultAliases == null || candidateResultAliases.isEmpty()) {
logger.info("Candidate custom result indices are empty.");
return;
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
}

// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
resultIndex,
config.getCustomResultIndexTTL()
);
}

private void proceedWithDefaultRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rolloverRequest,
String allResultIndicesPattern,
IndexType resultIndex
) {
proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null);
}

private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for custom result index, we don't need addMaxIndexDocsCondition. For default result index, we only need addMaxIndexDocsCondition. In this way, if customers don't elect any ISM option from AD dashboard, we won't touch it and cx can decide on their own. I think you can put addMaxIndexDocsCondition outside and only do it if you are handling default result index.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to only build addMaxIndexDocsCondition when performing rollover on default result index

RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
return rollOverRequest;
}

private void proceedWithRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rollOverRequest,
String allResultIndicesPattern,
IndexType resultIndex,
Integer customResultIndexTtl
) {
adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
} else {
IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexStatetate.mappingUpToDate = true;
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX)
|| resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));

}
} else {
// handle default result index deletion
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
}
}
}, exception -> {
// e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001,
// we cannot roll over twice in the same day as the index with the same name exists. We will get
// resource_already_exists_exception.
logger.error("Fail to roll over result index", exception);
}));
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

protected void initResultIndexDirectly(
Expand Down
156 changes: 154 additions & 2 deletions src/test/java/org/opensearch/ad/indices/RolloverTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,75 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.timeseries.TestHelpers.createSearchResponse;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.rollover.Condition;
import org.opensearch.action.admin.indices.rollover.MaxAgeCondition;
import org.opensearch.action.admin.indices.rollover.MaxDocsCondition;
import org.opensearch.action.admin.indices.rollover.MaxSizeCondition;
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class RolloverTests extends AbstractTimeSeriesTest {
private ADIndexManagement adIndices;
private IndicesAdminClient indicesClient;
private ClusterAdminClient clusterAdminClient;
private Client client;
private ClusterName clusterName;
private ClusterState clusterState;
private ClusterService clusterService;
private NamedXContentRegistry namedXContentRegistry;
private long defaultMaxDocs;
private int numberOfNodes;

@Override
public void setUp() throws Exception {
super.setUp();
Client client = mock(Client.class);
client = mock(Client.class);
indicesClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -98,14 +119,16 @@ public void setUp() throws Exception {
numberOfNodes = 2;
when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes);

namedXContentRegistry = TestHelpers.xContentRegistry();

adIndices = new ADIndexManagement(
client,
clusterService,
threadPool,
settings,
nodeFilter,
TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES,
NamedXContentRegistry.EMPTY
namedXContentRegistry
);

clusterAdminClient = mock(ClusterAdminClient.class);
Expand Down Expand Up @@ -248,4 +271,133 @@ public void testRetryingDelete() {
// 1 group delete, 1 separate retry for each index to delete
verify(indicesClient, times(2)).delete(any(), any());
}

public void testNoCustomResultIndexFound_RolloverDefaultResultIndex_shouldSucceed() {
setUpRolloverSuccess();
setUpGetConfigs_withNoCustomResultIndexAlias();

adIndices.rolloverAndDeleteHistoryIndex();
verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

public void testCustomResultIndexFound_RolloverCustomResultIndex_withConditions_shouldSucceed() throws IOException {
setUpGetConfigs_withCustomResultIndexAlias();

adIndices.rolloverAndDeleteHistoryIndex();

verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

private void setUpGetConfigs_withNoCustomResultIndexAlias() {
Metadata.Builder metaBuilder = Metadata
.builder()
.put(indexMeta(".opendistro-anomaly-detectors", 1L, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS), true);
clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build();
when(clusterService.state()).thenReturn(clusterState);

String detectorString = "{\"name\":\"AhtYYGWTgqkzairTchcs\",\"description\":\"iIiAVPMyFgnFlEniLbMyfJxyoGvJAl\","
+ "\"time_field\":\"HmdFH\",\"indices\":[\"ffsBF\"],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":"
+ "{\"field\":\"value\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"window_delay\":"
+ "{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},\"shingle_size\":8,\"schema_version\":-512063255,"
+ "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false,"
+ "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342,"
+ "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"customResultIndexOrAlias\":"
+ "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\""
+ ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":"
+ "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[]}";

doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
SearchHit config = SearchHit.fromXContent(TestHelpers.parser(detectorString));
SearchHits searchHits = new SearchHits(new SearchHit[] { config }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse response = new InternalSearchResponse(
searchHits,
InternalAggregations.EMPTY,
null,
null,
false,
null,
1
);
SearchResponse searchResponse = new SearchResponse(
response,
null,
1,
1,
0,
100,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
listener.onResponse(searchResponse);
return null;
}).when(client).search(any(), any());
}

private void setUpRolloverSuccessForCustomIndex() {
doAnswer(invocation -> {
RolloverRequest request = invocation.getArgument(0);
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArgument(1);

assertEquals("opensearch-ad-plugin-result-", request.indices()[0]);
Map<String, Condition<?>> conditions = request.getConditions();
assertEquals(2, conditions.size());
assertEquals(new MaxAgeCondition(TimeValue.timeValueDays(7)), conditions.get(MaxAgeCondition.NAME));
assertEquals(new MaxSizeCondition(new ByteSizeValue(51200, ByteSizeUnit.MB)), conditions.get(MaxSizeCondition.NAME));

CreateIndexRequest createIndexRequest = request.getCreateIndexRequest();
assertEquals("<opensearch-ad-plugin-result--history-{now/d}-1>", createIndexRequest.index());
assertTrue(createIndexRequest.mappings().contains("data_start_time"));
listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
return null;
}).when(indicesClient).rolloverIndex(any(), any());
}

private void setUpGetConfigs_withCustomResultIndexAlias() throws IOException {
IndexMetadata defaultResultIndex = IndexMetadata
.builder(".opendistro-anomaly-detectors")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
IndexMetadata customResultIndex = IndexMetadata
.builder("opensearch-ad-plugin-result-test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX).writeIndex(true).build())
.build();

clusterState = ClusterState
.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put(defaultResultIndex, false).put(customResultIndex, false).build())
.build();

when(clusterService.state()).thenReturn(clusterState);

String detectorStringWithCustomResultIndex =
"{\"name\":\"todagtCMkwpcaedpyYUM\",\"description\":\"ClrcaMpuLfeDSlVduRcKlqPZyqWDBf\","
+ "\"time_field\":\"dJRwh\",\"indices\":[\"eIrgWMqAED\"],\"feature_attributes\":[{\"feature_id\":\"lxYRN\","
+ "\"feature_name\":\"eqSeU\",\"feature_enabled\":true,\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}],"
+ "\"detection_interval\":{\"period\":{\"interval\":425,\"unit\":\"Minutes\"}},"
+ "\"window_delay\":{\"period\":{\"interval\":973,\"unit\":\"Minutes\"}},\"shingle_size\":4,\"schema_version\":-1203962153,"
+ "\"ui_metadata\":{\"JbAaV\":{\"feature_id\":\"rIFjS\",\"feature_name\":\"QXCmS\",\"feature_enabled\":false,"
+ "\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}},\"last_update_time\":1568396089028,"
+ "\"result_index\":\"opensearch-ad-plugin-result-\",\"result_index_min_size\":51200,\"result_index_min_age\":7}";

AnomalyDetector parsedDetector = AnomalyDetector
.parse(TestHelpers.parser(detectorStringWithCustomResultIndex), "id", 1L, null, null);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) args[1];
setUpRolloverSuccessForCustomIndex();
listener.onResponse(createSearchResponse(parsedDetector));
return null;
}).when(client).search(any(), any());

}
}