diff --git a/changelog/unreleased/pr-20121.toml b/changelog/unreleased/pr-20121.toml new file mode 100644 index 000000000000..15fcda35f89a --- /dev/null +++ b/changelog/unreleased/pr-20121.toml @@ -0,0 +1,5 @@ +type = "f" +message = "Stop index retention during remote reindex migration" + +pulls = ["20121"] +issues = ["graylog-plugin-enterprise#8097"] diff --git a/graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/UnsupportedRemoteReindexingMigrationAdapterES7.java b/graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/UnsupportedRemoteReindexingMigrationAdapterES7.java index 86449840e83a..839e1aa93e9a 100644 --- a/graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/UnsupportedRemoteReindexingMigrationAdapterES7.java +++ b/graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/UnsupportedRemoteReindexingMigrationAdapterES7.java @@ -18,6 +18,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.datanode.RemoteReindexRequest; import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter; import org.graylog2.indexer.migration.IndexerConnectionCheckResult; @@ -30,6 +31,11 @@ public class UnsupportedRemoteReindexingMigrationAdapterES7 implements RemoteRei public static final String UNSUPPORTED_MESSAGE = "This operation should never be called. We remote-reindex into the DataNode that contains OpenSearch. This adapter only exists for API completeness"; + @Override + public boolean isMigrationRunning(IndexSet indexSet) { + return false; // we'll never run a remote reindex migration against elasticsearch target. It's always OS in datanode. + } + @Override public String start(RemoteReindexRequest request) { throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); @@ -47,6 +53,6 @@ public IndexerConnectionCheckResult checkConnection(@Nonnull URI uri, @Nullable @Override public Optional getLatestMigrationId() { - throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + return Optional.empty(); } } diff --git a/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java index 1b46c79d78c7..1c748b3dfc70 100644 --- a/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java +++ b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java @@ -49,6 +49,7 @@ import org.graylog.shaded.opensearch2.org.opensearch.tasks.Task; import org.graylog2.datanode.RemoteReindexAllowlistEvent; import org.graylog2.events.ClusterEventBus; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.IndexSetRegistry; import org.graylog2.indexer.datanode.IndexMigrationConfiguration; import org.graylog2.indexer.datanode.MigrationConfiguration; @@ -85,6 +86,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -133,6 +135,28 @@ public RemoteReindexingMigrationAdapterOS2(final OpenSearchClient client, this.notificationService = notificationService; } + @Override + public boolean isMigrationRunning(IndexSet indexSet) { + return reindexMigrationService.getLatestMigrationId() + .map(this::status) + .map(migration -> isIndexSetCurrentlyMigrated(indexSet, migration)) + .orElse(false); + } + + @Nonnull + private Boolean isIndexSetCurrentlyMigrated(IndexSet indexSet, RemoteReindexMigration mig) { + if (mig.status() == Status.NOT_STARTED || mig.status() == Status.RUNNING) { + final Set runningIndices = mig.indices().stream() + .filter(i -> !i.isCompleted()) + .map(RemoteReindexIndex::name) + .collect(Collectors.toSet()); + final Set migratedIndexSets = indexSetRegistry.getForIndices(runningIndices); + return migratedIndexSets.contains(indexSet); + } else { + return false; + } + } + @Override public String start(RemoteReindexRequest request) { final AggregatedConnectionResponse response = getAllIndicesFrom(request.uri(), request.username(), request.password(), request.trustUnknownCerts()); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/datanode/RemoteReindexingMigrationAdapter.java b/graylog2-server/src/main/java/org/graylog2/indexer/datanode/RemoteReindexingMigrationAdapter.java index 15df0b77dc83..28e87840a92e 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/datanode/RemoteReindexingMigrationAdapter.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/datanode/RemoteReindexingMigrationAdapter.java @@ -18,6 +18,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.migration.IndexerConnectionCheckResult; import org.graylog2.indexer.migration.RemoteReindexMigration; @@ -25,6 +26,8 @@ import java.util.Optional; public interface RemoteReindexingMigrationAdapter { + boolean isMigrationRunning(IndexSet indexSet); + enum Status { NOT_STARTED, RUNNING, ERROR, FINISHED } diff --git a/graylog2-server/src/main/java/org/graylog2/periodical/IndexRetentionThread.java b/graylog2-server/src/main/java/org/graylog2/periodical/IndexRetentionThread.java index 9d1b845d0ab7..4abd1f3992d9 100644 --- a/graylog2-server/src/main/java/org/graylog2/periodical/IndexRetentionThread.java +++ b/graylog2-server/src/main/java/org/graylog2/periodical/IndexRetentionThread.java @@ -21,6 +21,7 @@ import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.IndexSetRegistry; import org.graylog2.indexer.cluster.Cluster; +import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter; import org.graylog2.indexer.indexset.IndexSetConfig; import org.graylog2.notifications.Notification; import org.graylog2.notifications.NotificationService; @@ -48,6 +49,8 @@ public class IndexRetentionThread extends Periodical { private final Map> retentionStrategyMap; private final DataTieringOrchestrator dataTieringOrchestrator; + private final RemoteReindexingMigrationAdapter migrationService; + @Inject public IndexRetentionThread(ElasticsearchConfiguration configuration, IndexSetRegistry indexSetRegistry, @@ -55,7 +58,7 @@ public IndexRetentionThread(ElasticsearchConfiguration configuration, NodeId nodeId, NotificationService notificationService, Map> retentionStrategyMap, - DataTieringOrchestrator dataTieringOrchestrator) { + DataTieringOrchestrator dataTieringOrchestrator, RemoteReindexingMigrationAdapter migrationService) { this.configuration = configuration; this.indexSetRegistry = indexSetRegistry; this.cluster = cluster; @@ -63,6 +66,7 @@ public IndexRetentionThread(ElasticsearchConfiguration configuration, this.notificationService = notificationService; this.retentionStrategyMap = retentionStrategyMap; this.dataTieringOrchestrator = dataTieringOrchestrator; + this.migrationService = migrationService; } @Override @@ -81,6 +85,12 @@ public void doRun() { LOG.debug("Skipping non-writable index set <{}> ({})", indexSet.getConfig().id(), indexSet.getConfig().title()); continue; } + + if(isCurrentlyMigrated(indexSet)) { + LOG.info("Index set <{}> is currently being migrated, skipping retention", indexSet.getConfig().title()); + continue; + } + final IndexSetConfig config = indexSet.getConfig(); if (config.dataTieringConfig() != null) { dataTieringOrchestrator.retain(indexSet); @@ -99,6 +109,10 @@ public void doRun() { } } + private boolean isCurrentlyMigrated(IndexSet indexSet) { + return migrationService.isMigrationRunning(indexSet); + } + private void retentionProblemNotification(String title, String description) { final Notification notification = notificationService.buildNow() .addNode(nodeId.getNodeId())