From 928040ee765096224e4411d46a011f0bce3da9da Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 23 Jan 2025 10:56:18 +0000 Subject: [PATCH] [ML] Automatically rollover legacy ml indices (#120405) Rollover ml indices created in 7.x and create new indices that version 9 can read and write to. This is required for ml to continue to run after during upgrade and reindex of 7.x indices --- docs/changelog/120405.yaml | 5 + .../org/elasticsearch/TransportVersions.java | 1 + .../core/ml/annotations/AnnotationIndex.java | 1 + .../xpack/core/ml/utils/MlIndexAndAlias.java | 25 +- .../core/ml/utils/MlIndexAndAliasTests.java | 9 +- .../xpack/ml/MachineLearning.java | 24 +- .../xpack/ml/MlAutoUpdateService.java | 23 +- .../xpack/ml/MlIndexRollover.java | 176 +++++++++++ .../datafeed/DatafeedConfigAutoUpdater.java | 2 +- .../xpack/ml/MlIndexRolloverTests.java | 283 ++++++++++++++++++ .../DatafeedConfigAutoUpdaterTests.java | 6 +- 11 files changed, 535 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/120405.yaml create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexRollover.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexRolloverTests.java diff --git a/docs/changelog/120405.yaml b/docs/changelog/120405.yaml new file mode 100644 index 0000000000000..9ca30a9473e7a --- /dev/null +++ b/docs/changelog/120405.yaml @@ -0,0 +1,5 @@ +pr: 120405 +summary: Automatically rollover legacy ml indices +area: Machine Learning +type: upgrade +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 750b23caf2151..65a745f0fe36b 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -161,6 +161,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0); public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0); public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_829_00_0); + public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_830_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java index 95753f02e396d..4ab096ca58582 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java @@ -49,6 +49,7 @@ public class AnnotationIndex { // Exposed for testing, but always use the aliases in non-test code. public static final String LATEST_INDEX_NAME = ".ml-annotations-000001"; + public static final String INDEX_PATTERN = ".ml-annotations-*"; // Due to historical bugs this index may not have the correct mappings // in some production clusters. Therefore new annotations should be // written to the latest index. If we ever switch to another new annotations diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index b630bafdbc77d..e85acc159059e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -66,7 +66,6 @@ public final class MlIndexAndAlias { private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class); - // Visible for testing static final Comparator INDEX_NAME_COMPARATOR = new Comparator<>() { private final Predicate HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate(); @@ -172,7 +171,7 @@ public static void createIndexAndAliasIfNecessary( } else { if (indexPointedByCurrentWriteAlias.isEmpty()) { assert concreteIndexNames.length > 0; - String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); + String latestConcreteIndexName = latestIndex(concreteIndexNames); updateWriteAlias(client, alias, null, latestConcreteIndexName, loggingListener); return; } @@ -279,18 +278,22 @@ private static void createFirstConcreteIndex( ); } - private static void updateWriteAlias( + public static void updateWriteAlias( Client client, String alias, @Nullable String currentIndex, String newIndex, ActionListener listener ) { - logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex); + if (currentIndex != null) { + logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex); + } else { + logger.info("About to create write alias [{}] for index [{}]", alias, newIndex); + } IndicesAliasesRequestBuilder requestBuilder = client.admin() .indices() .prepareAliases() - .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true)); + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true).writeIndex(true)); if (currentIndex != null) { requestBuilder.removeAlias(currentIndex, alias); } @@ -380,4 +383,16 @@ public static void installIndexTemplateIfRequired( public static boolean hasIndexTemplate(ClusterState state, String templateName) { return state.getMetadata().templatesV2().containsKey(templateName); } + + /** + * Returns the latest index. Latest is the index with the highest + * 6 digit suffix. + * @param concreteIndices List of index names + * @return The latest index by index name version suffix + */ + public static String latestIndex(String[] concreteIndices) { + return concreteIndices.length == 1 + ? concreteIndices[0] + : Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get(); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 8e20ba4bfa9bd..8fc1e55ec0ac5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -298,7 +298,7 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo assertThat( indicesAliasesRequest.getAliasActions(), contains( - AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true), + AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true).writeIndex(true), AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX) ) ); @@ -318,7 +318,7 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List e IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue(); assertThat( indicesAliasesRequest.getAliasActions(), - contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true)) + contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true).writeIndex(true)) ); } @@ -364,6 +364,11 @@ public void testIndexNameComparator() { assertThat(Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002")); } + public void testLatestIndex() { + var names = new String[] { "index-000001", "index-000002", "index-000003" }; + assertThat(MlIndexAndAlias.latestIndex(names), equalTo("index-000003")); + } + private void createIndexAndAliasIfNecessary(ClusterState clusterState) { MlIndexAndAlias.createIndexAndAliasIfNecessary( client, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 08c876dfdcc5d..043a27b7cd147 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -183,6 +183,7 @@ import org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; @@ -1222,7 +1223,25 @@ public Collection createComponents(PluginServices services) { MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService( threadPool, - List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver)) + List.of( + new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver), + new MlIndexRollover( + List.of( + new MlIndexRollover.IndexPatternAndAlias( + AnomalyDetectorsIndex.jobStateIndexPattern(), + AnomalyDetectorsIndex.jobStateIndexWriteAlias() + ), + new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias()), + new MlIndexRollover.IndexPatternAndAlias(AnnotationIndex.INDEX_PATTERN, AnnotationIndex.WRITE_ALIAS_NAME) + // TODO notifications = https://github.com/elastic/elasticsearch/pull/120064 + // TODO anomaly results + // TODO .ml-inference-XXXXXX - requires alias + // TODO .ml-inference-native-XXXXXX - requires alias (index added in 8.0) + ), + indexNameExpressionResolver, + client + ) + ) ); clusterService.addListener(mlAutoUpdateService); // this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it @@ -2025,6 +2044,9 @@ public void indicesMigrationComplete( new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"), new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"), new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices") + // TODO should the inference indices be included here? + // new AssociatedIndexDescriptor(".ml-inference-*", "ML Data Frame Analytics") + // new AssociatedIndexDescriptor(".ml-inference-native*", "ML indices for trained models") ); @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java index 94800daebf296..05c4d70e013e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java @@ -30,7 +30,7 @@ public interface UpdateAction { String getName(); - void runUpdate(); + void runUpdate(ClusterState latestState); } private final List updateActions; @@ -47,27 +47,34 @@ public MlAutoUpdateService(ThreadPool threadPool, List updateActio @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + if (event.localNodeMaster() == false) { return; } - if (event.localNodeMaster() == false) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { return; } - TransportVersion minTransportVersion = event.state().getMinTransportVersion(); + if (completedUpdates.size() == updateActions.size()) { + return; // all work complete + } + + final var latestState = event.state(); + TransportVersion minTransportVersion = latestState.getMinTransportVersion(); final List toRun = updateActions.stream() .filter(action -> action.isMinTransportVersionSupported(minTransportVersion)) .filter(action -> completedUpdates.contains(action.getName()) == false) - .filter(action -> action.isAbleToRun(event.state())) + .filter(action -> action.isAbleToRun(latestState)) .filter(action -> currentlyUpdating.add(action.getName())) .toList(); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> toRun.forEach(this::runUpdate)); + // TODO run updates serially + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) + .execute(() -> toRun.forEach((action) -> this.runUpdate(action, latestState))); } - private void runUpdate(UpdateAction action) { + private void runUpdate(UpdateAction action, ClusterState latestState) { try { logger.debug(() -> "[" + action.getName() + "] starting executing update action"); - action.runUpdate(); + action.runUpdate(latestState); this.completedUpdates.add(action.getName()); logger.debug(() -> "[" + action.getName() + "] succeeded executing update action"); } catch (Exception ex) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexRollover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexRollover.java new file mode 100644 index 0000000000000..7dbafdc2676ba --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexRollover.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +/** + * If any of the indices listed in {@code indicesToRollover} are legacy indices + * then call rollover to produce a new index with the current version. If the + * index does not have an alias the alias is created first. + * If none of the {@code indicesToRollover} exist or they are all non-legacy + * indices then nothing will be updated. + */ +public class MlIndexRollover implements MlAutoUpdateService.UpdateAction { + + private static final Logger logger = LogManager.getLogger(MlIndexRollover.class); + + public record IndexPatternAndAlias(String indexPattern, String alias) {} + + private final IndexNameExpressionResolver expressionResolver; + private final OriginSettingClient client; + private final List indicesToRollover; + + public MlIndexRollover(List indicesToRollover, IndexNameExpressionResolver expressionResolver, Client client) { + this.expressionResolver = expressionResolver; + this.client = new OriginSettingClient(client, ML_ORIGIN); + this.indicesToRollover = indicesToRollover; + } + + @Override + public boolean isMinTransportVersionSupported(TransportVersion minTransportVersion) { + // Wait for all nodes to be upgraded to ensure that the + // newly created index will be of the latest version. + return minTransportVersion.onOrAfter(TransportVersions.ML_ROLLOVER_LEGACY_INDICES); + } + + @Override + public boolean isAbleToRun(ClusterState latestState) { + for (var indexPatternAndAlias : indicesToRollover) { + String[] indices = expressionResolver.concreteIndexNames( + latestState, + IndicesOptions.lenientExpandOpenHidden(), + indexPatternAndAlias.indexPattern + ); + if (indices.length == 0) { + // The index does not exist but the MlAutoUpdateService will + // need to run this action and mark it as done. + // Ignore the missing index and continue the loop + continue; + } + + String latestIndex = MlIndexAndAlias.latestIndex(indices); + IndexRoutingTable routingTable = latestState.getRoutingTable().index(latestIndex); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + return false; + } + } + + return true; + } + + @Override + public String getName() { + return "ml_legacy_index_rollover"; + } + + @Override + public void runUpdate(ClusterState latestState) { + List failures = new ArrayList<>(); + + for (var indexPatternAndAlias : indicesToRollover) { + PlainActionFuture rolloverIndices = new PlainActionFuture<>(); + rolloverLegacyIndices(latestState, indexPatternAndAlias.indexPattern(), indexPatternAndAlias.alias(), rolloverIndices); + try { + rolloverIndices.actionGet(); + } catch (Exception ex) { + logger.warn(() -> "failed rolling over legacy index [" + indexPatternAndAlias.indexPattern() + "]", ex); + if (ex instanceof ElasticsearchException elasticsearchException) { + failures.add( + new ElasticsearchStatusException("Failed rollover", elasticsearchException.status(), elasticsearchException) + ); + } else { + failures.add(new ElasticsearchStatusException("Failed rollover", RestStatus.REQUEST_TIMEOUT, ex)); + } + + break; + } + } + + if (failures.isEmpty()) { + logger.info("ML legacy indies rolled over"); + return; + } + + ElasticsearchException exception = new ElasticsearchException("some error"); + failures.forEach(exception::addSuppressed); + throw exception; + } + + private void rolloverLegacyIndices(ClusterState clusterState, String indexPattern, String alias, ActionListener listener) { + var concreteIndices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED, indexPattern); + + if (concreteIndices.length == 0) { + // no matching indices + listener.onResponse(Boolean.FALSE); + return; + } + + String latestIndex = MlIndexAndAlias.latestIndex(concreteIndices); + boolean isCompatibleIndexVersion = isCompatibleIndexVersion(clusterState.metadata().index(latestIndex).getCreationVersion()); + boolean hasAlias = clusterState.getMetadata().hasAlias(alias); + + if (isCompatibleIndexVersion && hasAlias) { + // v8 index with alias, no action required + listener.onResponse(Boolean.FALSE); + return; + } + + SubscribableListener.newForked(l -> { + if (hasAlias == false) { + MlIndexAndAlias.updateWriteAlias(client, alias, null, latestIndex, l); + } else { + l.onResponse(Boolean.TRUE); + } + }).andThen((l, success) -> { + if (isCompatibleIndexVersion == false) { + logger.info("rolling over legacy index [{}] with alias [{}]", latestIndex, alias); + rollover(alias, l); + } else { + l.onResponse(Boolean.TRUE); + } + }).addListener(listener); + } + + private void rollover(String alias, ActionListener listener) { + client.admin().indices().rolloverIndex(new RolloverRequest(alias, null), listener.delegateFailure((l, response) -> { + l.onResponse(Boolean.TRUE); + })); + } + + /** + * True if the version is read *and* write compatible not just read only compatible + */ + static boolean isCompatibleIndexVersion(IndexVersion version) { + return version.onOrAfter(IndexVersions.MINIMUM_COMPATIBLE); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java index e61ffba9b3164..9fe9a5226f286 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java @@ -77,7 +77,7 @@ public String getName() { } @Override - public void runUpdate() { + public void runUpdate(ClusterState latestState) { PlainActionFuture> getdatafeeds = new PlainActionFuture<>(); provider.expandDatafeedConfigs("_all", true, null, getdatafeeds); List datafeedConfigBuilders = getdatafeeds.actionGet(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexRolloverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexRolloverTests.java new file mode 100644 index 0000000000000..aa59028a4cc0d --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexRolloverTests.java @@ -0,0 +1,283 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; +import org.elasticsearch.action.admin.indices.rollover.RolloverAction; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class MlIndexRolloverTests extends ESTestCase { + + private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); + + public void testIsAbleToRun_IndicesDoNotExist() { + RoutingTable.Builder routingTable = RoutingTable.builder(); + var rollover = new MlIndexRollover( + List.of( + new MlIndexRollover.IndexPatternAndAlias("my-index1-*", "my-index1-alias"), + new MlIndexRollover.IndexPatternAndAlias("my-index2-*", "my-index2-alias") + ), + indexNameExpressionResolver, + mock(Client.class) + ); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable.build()); + assertTrue(rollover.isAbleToRun(csBuilder.build())); + } + + public void testIsAbleToRun_IndicesHaveNoRouting() { + IndexMetadata.Builder indexMetadata = IndexMetadata.builder("my-index-000001"); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") + ); + + Metadata.Builder metadata = Metadata.builder(); + metadata.put(indexMetadata); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(RoutingTable.builder().build()); // no routing to index + csBuilder.metadata(metadata); + + var rollover = new MlIndexRollover( + List.of(new MlIndexRollover.IndexPatternAndAlias("my-index-*", "my-index-alias")), + indexNameExpressionResolver, + mock(Client.class) + ); + + assertFalse(rollover.isAbleToRun(csBuilder.build())); + } + + public void testIsAbleToRun_IndicesHaveNoActiveShards() { + String indexName = "my-index-000001"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") + ); + Index index = new Index(indexName, "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned( + shardId, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""), + ShardRouting.Role.DEFAULT + ); + shardRouting = shardRouting.initialize("node_id", null, 0L); + var routingTable = RoutingTable.builder() + .add(IndexRoutingTable.builder(index).addIndexShard(IndexShardRoutingTable.builder(shardId).addShard(shardRouting))) + .build(); + + Metadata.Builder metadata = Metadata.builder(); + metadata.put(indexMetadata); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable); + csBuilder.metadata(metadata); + + var rollover = new MlIndexRollover( + List.of(new MlIndexRollover.IndexPatternAndAlias("my-index-*", "my-index-alias")), + indexNameExpressionResolver, + mock(Client.class) + ); + + assertFalse(rollover.isAbleToRun(csBuilder.build())); + } + + public void testRunUpdate_NoMatchingIndices() { + RoutingTable.Builder routingTable = RoutingTable.builder(); + + var client = mock(Client.class); + var rollover = new MlIndexRollover( + List.of( + new MlIndexRollover.IndexPatternAndAlias("my-index1-*", "my-index1-alias"), + new MlIndexRollover.IndexPatternAndAlias("my-index2-*", "my-index2-alias") + ), + indexNameExpressionResolver, + client + ); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.routingTable(routingTable.build()); + rollover.runUpdate(csBuilder.build()); + verify(client).settings(); + verify(client).threadPool(); + verifyNoMoreInteractions(client); + } + + public void testRunUpdate_UpToDateIndicesWithAlias() { + String indexName = "my-index-000001"; + String indexAlias = "my-index-write"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") + ); + indexMetadata.putAlias(AliasMetadata.builder(indexAlias).build()); + + Metadata.Builder metadata = Metadata.builder(); + metadata.put(indexMetadata); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metadata(metadata); + + var client = mock(Client.class); + var rollover = new MlIndexRollover( + List.of(new MlIndexRollover.IndexPatternAndAlias("my-index-*", indexAlias)), + indexNameExpressionResolver, + client + ); + + rollover.runUpdate(csBuilder.build()); + // everything up to date so no action for the client + verify(client).settings(); + verify(client).threadPool(); + verifyNoMoreInteractions(client); + } + + public void testRunUpdate_LegacyIndexWithAlias() { + String indexName = "my-index-000001"; + String indexAlias = "my-index-write"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersions.V_7_17_0) // cannot read and write to a 7.x index + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") + ); + indexMetadata.putAlias(AliasMetadata.builder(indexAlias).build()); + + Metadata.Builder metadata = Metadata.builder(); + metadata.put(indexMetadata); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metadata(metadata); + + var client = mockClientWithRolloverAndAlias(); + var rollover = new MlIndexRollover( + List.of(new MlIndexRollover.IndexPatternAndAlias("my-index-*", indexAlias)), + indexNameExpressionResolver, + client + ); + + rollover.runUpdate(csBuilder.build()); + verify(client).settings(); + verify(client, times(3)).threadPool(); + verify(client).execute(same(RolloverAction.INSTANCE), any(), any()); // index rolled over + verifyNoMoreInteractions(client); + } + + public void testRunUpdate_LegacyIndexWithoutAlias() { + String indexName = "my-index-000001"; + String indexAlias = "my-index-write"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersions.V_7_17_0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") + ); + // index is missing alias + + Metadata.Builder metadata = Metadata.builder(); + metadata.put(indexMetadata); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metadata(metadata); + + var client = mockClientWithRolloverAndAlias(); + var rollover = new MlIndexRollover( + List.of(new MlIndexRollover.IndexPatternAndAlias("my-index-*", indexAlias)), + indexNameExpressionResolver, + client + ); + + rollover.runUpdate(csBuilder.build()); + verify(client).settings(); + verify(client, times(5)).threadPool(); + verify(client).execute(same(TransportIndicesAliasesAction.TYPE), any(), any()); // alias created + verify(client).execute(same(RolloverAction.INSTANCE), any(), any()); // index rolled over + verifyNoMoreInteractions(client); + } + + public void testIsCompatibleIndexVersion() { + assertTrue(MlIndexRollover.isCompatibleIndexVersion(IndexVersion.current())); + assertTrue(MlIndexRollover.isCompatibleIndexVersion(IndexVersions.MINIMUM_COMPATIBLE)); + assertFalse(MlIndexRollover.isCompatibleIndexVersion(IndexVersions.MINIMUM_READONLY_COMPATIBLE)); + } + + @SuppressWarnings("unchecked") + private Client mockClientWithRolloverAndAlias() { + var client = mock(Client.class); + + doAnswer(invocationOnMock -> { + ActionListener actionListener = (ActionListener) invocationOnMock.getArguments()[2]; + actionListener.onResponse(new RolloverResponse("old", "new", Map.of(), false, true, true, true, true)); + return null; + }).when(client).execute(same(RolloverAction.INSTANCE), any(RolloverRequest.class), any(ActionListener.class)); + + doAnswer(invocationOnMock -> { + ActionListener actionListener = (ActionListener) invocationOnMock + .getArguments()[2]; + actionListener.onResponse(IndicesAliasesResponse.ACKNOWLEDGED_NO_ERRORS); + return null; + }).when(client).execute(same(TransportIndicesAliasesAction.TYPE), any(IndicesAliasesRequest.class), any(ActionListener.class)); + + var threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + return client; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java index bf6e63faeb6cf..337de5ae7d7a4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java @@ -79,7 +79,7 @@ public void testWithSuccessfulUpdates() { withDatafeed(datafeedWithRewrite2, true); DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider, indexNameExpressionResolver); - updater.runUpdate(); + updater.runUpdate(mock(ClusterState.class)); verify(provider, times(1)).updateDatefeedConfig( eq(datafeedWithRewrite1), @@ -120,7 +120,7 @@ public void testWithUpdateFailures() { }).when(provider).updateDatefeedConfig(eq(datafeedWithRewriteFailure), any(), any(), any(), any()); DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider, indexNameExpressionResolver); - ElasticsearchException ex = expectThrows(ElasticsearchException.class, updater::runUpdate); + ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> updater.runUpdate(mock(ClusterState.class))); assertThat(ex.getMessage(), equalTo("some datafeeds failed being upgraded.")); assertThat(ex.getSuppressed().length, equalTo(1)); assertThat(ex.getSuppressed()[0].getMessage(), equalTo("Failed to update datafeed " + datafeedWithRewriteFailure)); @@ -155,7 +155,7 @@ public void testWithNoUpdates() { withDatafeed(datafeedWithoutRewrite2, false); DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider, indexNameExpressionResolver); - updater.runUpdate(); + updater.runUpdate(mock(ClusterState.class)); verify(provider, times(0)).updateDatefeedConfig(any(), any(DatafeedUpdate.class), eq(Collections.emptyMap()), any(), any()); }