Skip to content

Commit

Permalink
[ML] Automatically rollover legacy ml indices (elastic#120405)
Browse files Browse the repository at this point in the history
Rollover ml indices created in 7.x and create new indices that version 9 can 
read and write to. This is required for ml to continue to run after during 
upgrade and reindex of 7.x indices
  • Loading branch information
davidkyle authored Jan 23, 2025
1 parent a1fd7bc commit 928040e
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120405.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120405
summary: Automatically rollover legacy ml indices
area: Machine Learning
type: upgrade
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);
public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_829_00_0);
public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_830_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AnnotationIndex {

// Exposed for testing, but always use the aliases in non-test code.
public static final String LATEST_INDEX_NAME = ".ml-annotations-000001";
public static final String INDEX_PATTERN = ".ml-annotations-*";
// Due to historical bugs this index may not have the correct mappings
// in some production clusters. Therefore new annotations should be
// written to the latest index. If we ever switch to another new annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public final class MlIndexAndAlias {

private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);

// Visible for testing
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<>() {

private final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
Expand Down Expand Up @@ -172,7 +171,7 @@ public static void createIndexAndAliasIfNecessary(
} else {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
String latestConcreteIndexName = latestIndex(concreteIndexNames);
updateWriteAlias(client, alias, null, latestConcreteIndexName, loggingListener);
return;
}
Expand Down Expand Up @@ -279,18 +278,22 @@ private static void createFirstConcreteIndex(
);
}

private static void updateWriteAlias(
public static void updateWriteAlias(
Client client,
String alias,
@Nullable String currentIndex,
String newIndex,
ActionListener<Boolean> listener
) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
if (currentIndex != null) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
} else {
logger.info("About to create write alias [{}] for index [{}]", alias, newIndex);
}
IndicesAliasesRequestBuilder requestBuilder = client.admin()
.indices()
.prepareAliases()
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true));
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true).writeIndex(true));
if (currentIndex != null) {
requestBuilder.removeAlias(currentIndex, alias);
}
Expand Down Expand Up @@ -380,4 +383,16 @@ public static void installIndexTemplateIfRequired(
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().templatesV2().containsKey(templateName);
}

/**
* Returns the latest index. Latest is the index with the highest
* 6 digit suffix.
* @param concreteIndices List of index names
* @return The latest index by index name version suffix
*/
public static String latestIndex(String[] concreteIndices) {
return concreteIndices.length == 1
? concreteIndices[0]
: Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true),
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true).writeIndex(true),
AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)
)
);
Expand All @@ -318,7 +318,7 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> e
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true))
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true).writeIndex(true))
);
}

Expand Down Expand Up @@ -364,6 +364,11 @@ public void testIndexNameComparator() {
assertThat(Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002"));
}

public void testLatestIndex() {
var names = new String[] { "index-000001", "index-000002", "index-000003" };
assertThat(MlIndexAndAlias.latestIndex(names), equalTo("index-000003"));
}

private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
import org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
Expand Down Expand Up @@ -1222,7 +1223,25 @@ public Collection<?> createComponents(PluginServices services) {

MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(
threadPool,
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver))
List.of(
new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver),
new MlIndexRollover(
List.of(
new MlIndexRollover.IndexPatternAndAlias(
AnomalyDetectorsIndex.jobStateIndexPattern(),
AnomalyDetectorsIndex.jobStateIndexWriteAlias()
),
new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias()),
new MlIndexRollover.IndexPatternAndAlias(AnnotationIndex.INDEX_PATTERN, AnnotationIndex.WRITE_ALIAS_NAME)
// TODO notifications = https://github.com/elastic/elasticsearch/pull/120064
// TODO anomaly results
// TODO .ml-inference-XXXXXX - requires alias
// TODO .ml-inference-native-XXXXXX - requires alias (index added in 8.0)
),
indexNameExpressionResolver,
client
)
)
);
clusterService.addListener(mlAutoUpdateService);
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
Expand Down Expand Up @@ -2025,6 +2044,9 @@ public void indicesMigrationComplete(
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
// TODO should the inference indices be included here?
// new AssociatedIndexDescriptor(".ml-inference-*", "ML Data Frame Analytics")
// new AssociatedIndexDescriptor(".ml-inference-native*", "ML indices for trained models")
);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface UpdateAction {

String getName();

void runUpdate();
void runUpdate(ClusterState latestState);
}

private final List<UpdateAction> updateActions;
Expand All @@ -47,27 +47,34 @@ public MlAutoUpdateService(ThreadPool threadPool, List<UpdateAction> updateActio

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
if (event.localNodeMaster() == false) {
return;
}
if (event.localNodeMaster() == false) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return;
}

TransportVersion minTransportVersion = event.state().getMinTransportVersion();
if (completedUpdates.size() == updateActions.size()) {
return; // all work complete
}

final var latestState = event.state();
TransportVersion minTransportVersion = latestState.getMinTransportVersion();
final List<UpdateAction> toRun = updateActions.stream()
.filter(action -> action.isMinTransportVersionSupported(minTransportVersion))
.filter(action -> completedUpdates.contains(action.getName()) == false)
.filter(action -> action.isAbleToRun(event.state()))
.filter(action -> action.isAbleToRun(latestState))
.filter(action -> currentlyUpdating.add(action.getName()))
.toList();
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> toRun.forEach(this::runUpdate));
// TODO run updates serially
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(() -> toRun.forEach((action) -> this.runUpdate(action, latestState)));
}

private void runUpdate(UpdateAction action) {
private void runUpdate(UpdateAction action, ClusterState latestState) {
try {
logger.debug(() -> "[" + action.getName() + "] starting executing update action");
action.runUpdate();
action.runUpdate(latestState);
this.completedUpdates.add(action.getName());
logger.debug(() -> "[" + action.getName() + "] succeeded executing update action");
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

/**
* If any of the indices listed in {@code indicesToRollover} are legacy indices
* then call rollover to produce a new index with the current version. If the
* index does not have an alias the alias is created first.
* If none of the {@code indicesToRollover} exist or they are all non-legacy
* indices then nothing will be updated.
*/
public class MlIndexRollover implements MlAutoUpdateService.UpdateAction {

private static final Logger logger = LogManager.getLogger(MlIndexRollover.class);

public record IndexPatternAndAlias(String indexPattern, String alias) {}

private final IndexNameExpressionResolver expressionResolver;
private final OriginSettingClient client;
private final List<IndexPatternAndAlias> indicesToRollover;

public MlIndexRollover(List<IndexPatternAndAlias> indicesToRollover, IndexNameExpressionResolver expressionResolver, Client client) {
this.expressionResolver = expressionResolver;
this.client = new OriginSettingClient(client, ML_ORIGIN);
this.indicesToRollover = indicesToRollover;
}

@Override
public boolean isMinTransportVersionSupported(TransportVersion minTransportVersion) {
// Wait for all nodes to be upgraded to ensure that the
// newly created index will be of the latest version.
return minTransportVersion.onOrAfter(TransportVersions.ML_ROLLOVER_LEGACY_INDICES);
}

@Override
public boolean isAbleToRun(ClusterState latestState) {
for (var indexPatternAndAlias : indicesToRollover) {
String[] indices = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
indexPatternAndAlias.indexPattern
);
if (indices.length == 0) {
// The index does not exist but the MlAutoUpdateService will
// need to run this action and mark it as done.
// Ignore the missing index and continue the loop
continue;
}

String latestIndex = MlIndexAndAlias.latestIndex(indices);
IndexRoutingTable routingTable = latestState.getRoutingTable().index(latestIndex);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
}

return true;
}

@Override
public String getName() {
return "ml_legacy_index_rollover";
}

@Override
public void runUpdate(ClusterState latestState) {
List<Exception> failures = new ArrayList<>();

for (var indexPatternAndAlias : indicesToRollover) {
PlainActionFuture<Boolean> rolloverIndices = new PlainActionFuture<>();
rolloverLegacyIndices(latestState, indexPatternAndAlias.indexPattern(), indexPatternAndAlias.alias(), rolloverIndices);
try {
rolloverIndices.actionGet();
} catch (Exception ex) {
logger.warn(() -> "failed rolling over legacy index [" + indexPatternAndAlias.indexPattern() + "]", ex);
if (ex instanceof ElasticsearchException elasticsearchException) {
failures.add(
new ElasticsearchStatusException("Failed rollover", elasticsearchException.status(), elasticsearchException)
);
} else {
failures.add(new ElasticsearchStatusException("Failed rollover", RestStatus.REQUEST_TIMEOUT, ex));
}

break;
}
}

if (failures.isEmpty()) {
logger.info("ML legacy indies rolled over");
return;
}

ElasticsearchException exception = new ElasticsearchException("some error");
failures.forEach(exception::addSuppressed);
throw exception;
}

private void rolloverLegacyIndices(ClusterState clusterState, String indexPattern, String alias, ActionListener<Boolean> listener) {
var concreteIndices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED, indexPattern);

if (concreteIndices.length == 0) {
// no matching indices
listener.onResponse(Boolean.FALSE);
return;
}

String latestIndex = MlIndexAndAlias.latestIndex(concreteIndices);
boolean isCompatibleIndexVersion = isCompatibleIndexVersion(clusterState.metadata().index(latestIndex).getCreationVersion());
boolean hasAlias = clusterState.getMetadata().hasAlias(alias);

if (isCompatibleIndexVersion && hasAlias) {
// v8 index with alias, no action required
listener.onResponse(Boolean.FALSE);
return;
}

SubscribableListener.<Boolean>newForked(l -> {
if (hasAlias == false) {
MlIndexAndAlias.updateWriteAlias(client, alias, null, latestIndex, l);
} else {
l.onResponse(Boolean.TRUE);
}
}).<Boolean>andThen((l, success) -> {
if (isCompatibleIndexVersion == false) {
logger.info("rolling over legacy index [{}] with alias [{}]", latestIndex, alias);
rollover(alias, l);
} else {
l.onResponse(Boolean.TRUE);
}
}).addListener(listener);
}

private void rollover(String alias, ActionListener<Boolean> listener) {
client.admin().indices().rolloverIndex(new RolloverRequest(alias, null), listener.delegateFailure((l, response) -> {
l.onResponse(Boolean.TRUE);
}));
}

/**
* True if the version is read *and* write compatible not just read only compatible
*/
static boolean isCompatibleIndexVersion(IndexVersion version) {
return version.onOrAfter(IndexVersions.MINIMUM_COMPATIBLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String getName() {
}

@Override
public void runUpdate() {
public void runUpdate(ClusterState latestState) {
PlainActionFuture<List<DatafeedConfig.Builder>> getdatafeeds = new PlainActionFuture<>();
provider.expandDatafeedConfigs("_all", true, null, getdatafeeds);
List<DatafeedConfig.Builder> datafeedConfigBuilders = getdatafeeds.actionGet();
Expand Down
Loading

0 comments on commit 928040e

Please sign in to comment.